Skip to content

Commit

Permalink
rename to flexbe_status_listener with heartbeat and sync updates; use…
Browse files Browse the repository at this point in the history
… latched state_map topic
  • Loading branch information
David Conner committed Sep 26, 2024
1 parent 81fd3a3 commit 73904d1
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 37 deletions.
4 changes: 3 additions & 1 deletion flexbe_onboard/flexbe_onboard/flexbe_onboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ def __init__(self):
# only at onboard level
self._heartbeat_pub = self.create_publisher(BehaviorSync, Topics._ONBOARD_HEARTBEAT_TOPIC, 10)
self._status_pub = self.create_publisher(BEStatus, Topics._ONBOARD_STATUS_TOPIC, 10)
self._state_map_pub = self.create_publisher(StateMapMsg, Topics._STATE_MAP_TOPIC, 2)

# Latch state map so we can retreive later if desired
latching_qos = QoSProfile(depth=1, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL)
self._state_map_pub = self.create_publisher(StateMapMsg, Topics._STATE_MAP_TOPIC, qos_profile=latching_qos)

self._version_sub = self.create_subscription(String, Topics._UI_VERSION_TOPIC,
self._version_callback, qos_profile=latching_qos)

Expand Down
2 changes: 1 addition & 1 deletion flexbe_widget/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ install(PROGRAMS
bin/breakpoint
bin/create_repo
bin/evaluate_logs
bin/flexbe_outcome_listener
bin/flexbe_status_listener
DESTINATION lib/${PROJECT_NAME}
)

Expand Down
6 changes: 0 additions & 6 deletions flexbe_widget/bin/flexbe_outcome_listener

This file was deleted.

6 changes: 6 additions & 0 deletions flexbe_widget/bin/flexbe_status_listener
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python3

from flexbe_widget.flexbe_status_listener import flexbe_status_listener_main

if __name__ == '__main__':
flexbe_status_listener_main()
6 changes: 5 additions & 1 deletion flexbe_widget/flexbe_widget/behavior_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSProfile

from rosidl_runtime_py import get_interface_path

Expand Down Expand Up @@ -78,7 +79,10 @@ def __init__(self):
self._status_pub = self.create_publisher(BEStatus, Topics._ONBOARD_STATUS_TOPIC, 100)
self._mirror_pub = self.create_publisher(ContainerStructure, Topics._MIRROR_STRUCTURE_TOPIC, 100)
self._heartbeat_pub = self.create_publisher(Int32, Topics._LAUNCHER_HEARTBEAT_TOPIC, 2)
self._state_map_pub = self.create_publisher(StateMapMsg, Topics._STATE_MAP_OCS_TOPIC, 2)

# Latch state map so we can retreive later if desired
latching_qos = QoSProfile(depth=1, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL)
self._state_map_pub = self.create_publisher(StateMapMsg, Topics._STATE_MAP_OCS_TOPIC, latching_qos)

self._behavior_lib = BehaviorLibrary(self)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,52 +28,83 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import rclpy
from rclpy.node import Node

from flexbe_core.core import StateMap, Topics

class FlexbeOutcomeListener(Node):
"""Simple lister to echo FlexBE state machine information."""
def __init__(self):
super().__init__('flexbe_outcome_listener')

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy, QoSProfile

# Suppress the node name and severity information (ChatGPT led me astray)
# import logging
# formatter = logging.Formatter('[%(asctime)s] %(message)s')
# console_handler = logging.StreamHandler()
# console_handler.setFormatter(formatter)

# self.get_logger().handlers.clear()
# self.get_logger().addHandler(console_handler)
class FlexbeStatusListener(Node):
"""Simple listener to echo FlexBE state machine information."""

def __init__(self):
super().__init__('flexbe_status_listener')
latching_qos = QoSProfile(depth=1, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL)
self._state_map = None
self._state_map_sub = self.create_subscription(
Topics._topic_types[Topics._STATE_MAP_OCS_TOPIC],
Topics._STATE_MAP_OCS_TOPIC,
self._state_map_callback,
latching_qos)
self._state_map_sub # prevent unused variable warning

self._outcome_sub = self.create_subscription(
Topics._topic_types[Topics._OUTCOME_TOPIC],
Topics._OUTCOME_TOPIC,
self._outcome_callback,
10)
self._state_map_sub # prevent unused variable warning
self._outcome_sub # prevent unused variable warning

self._state_map_sub = self.create_subscription(
Topics._topic_types[Topics._STATE_MAP_OCS_TOPIC],
Topics._STATE_MAP_OCS_TOPIC,
self._state_map_callback,
self._heartbeat_sub = self.create_subscription(
Topics._topic_types[Topics._ONBOARD_HEARTBEAT_TOPIC],
Topics._ONBOARD_HEARTBEAT_TOPIC,
self._heartbeat_callback,
10)
self._state_map_sub # prevent unused variable warning
self._heartbeat_sub # prevent unused variable warning

self._sync_sub = self.create_subscription(
Topics._topic_types[Topics._MIRROR_SYNC_TOPIC],
Topics._MIRROR_SYNC_TOPIC,
self._sync_callback,
10)
self._sync_sub # prevent unused variable warning

def _heartbeat_callback(self, msg):
if self._state_map is None:
self.get_logger().info(f'Onboard heartbeat {msg}')
return

states = []
for checksum in msg.current_state_checksums:
state_id, _ = StateMap.unhash(checksum)
states.append(self._state_map.get(state_id, 'unknown'))

self.get_logger().info(f'Onboard heartbeat {msg.behavior_id:10d} : {states}')

def _sync_callback(self, msg):
if self._state_map is None:
self.get_logger().info(f'Synchronize mirror {msg}')
return

states = []
for checksum in msg.current_state_checksums:
state_id, _ = StateMap.unhash(checksum)
states.append(self._state_map.get(state_id, 'unknown'))

self.get_logger().info(f'Synchronize mirror {msg.behavior_id:10d} : {states}')

def _outcome_callback(self, msg):
if self._state_map is None:
self.get_logger().info(f"Outcome msg hash value {msg.data:11d}")
self.get_logger().info(f'Outcome msg hash value {msg.data:11d}')
return

state_id, outcome = StateMap.unhash(msg.data)
path = f"'{self._state_map.get(state_id, 'unknown')}'"
self.get_logger().info(f"Outcome {outcome:2d} from {state_id:11d} {path:60s}")
self.get_logger().info(f'Outcome {outcome:2d} from {state_id:11d} {path:60s}')

def _state_map_callback(self, msg):
self.get_logger().info(f"New state map received for {msg.behavior_id}")
self.get_logger().info(f'New state map received for {msg.behavior_id}')

state_map = {}
for id, path in zip(msg.state_ids, msg.state_paths):
Expand All @@ -82,13 +113,14 @@ def _state_map_callback(self, msg):

self._state_map = state_map

def flexbe_outcome_listener_main(args=None):

def flexbe_status_listener_main(args=None):
"""Run the status listener to echo state info given state id in messages."""
print('Suggest: export RCUTILS_CONSOLE_OUTPUT_FORMAT="[{time}] {message}" in terminal')
print(' Run this before starting behavior to get full state path informatio from state map.')
print(' Run this before starting behavior to get full state path information from state map.')
rclpy.init(args=args)

outcomes = FlexbeOutcomeListener()
outcomes = FlexbeStatusListener()

rclpy.spin(outcomes)

Expand All @@ -100,4 +132,4 @@ def flexbe_outcome_listener_main(args=None):


if __name__ == '__main__':
flexbe_outcome_listener_main()
flexbe_status_listener_main()

0 comments on commit 73904d1

Please sign in to comment.