From ba8a9eaf977ff4bc16da36832c1b5a9e6fffc55e Mon Sep 17 00:00:00 2001 From: Lucas Walter Date: Sun, 15 Sep 2024 11:01:50 -0700 Subject: [PATCH] aggregate tf static messages before normal playback, which eliminates the bug with starting playback after the statics were stamped --- mcap_tools/src/mcap_play.rs | 108 +++++++++++++++++++++++------------- 1 file changed, 68 insertions(+), 40 deletions(-) diff --git a/mcap_tools/src/mcap_play.rs b/mcap_tools/src/mcap_play.rs index be6091e..4c97a39 100644 --- a/mcap_tools/src/mcap_play.rs +++ b/mcap_tools/src/mcap_play.rs @@ -33,14 +33,14 @@ fn use_topic(topic: &str, include_re: &Option, exclude_re: &Option // TODO(lucasw) topic_type matching would be useful as well if let Some(ref re) = include_re { if re.captures(topic).is_none() { - log::debug!("not recording from {topic}"); + log::debug!("not playing back {topic}"); return false; } } if let Some(ref re) = exclude_re { if re.captures(topic).is_some() { - log::info!("excluding recording from {topic}"); + log::info!("excluding playing back {topic}"); return false; } } @@ -62,7 +62,7 @@ async fn mcap_playback_init( include_re: &Option, exclude_re: &Option, remaps: &HashMap, -) -> Result<(HashMap, f64, f64), anyhow::Error> { +) -> Result<(HashMap, tf2_msgs::TFMessage, f64, f64), anyhow::Error> { // TODO(lucasw) could create every publisher from the summary let summary = mcap::read::Summary::read(mapped)?; // let summary = summary.ok_or(Err(anyhow::anyhow!("no summary")))?; @@ -83,6 +83,7 @@ async fn mcap_playback_init( msg_t1 - msg_t0 ); + let mut tf_static_aggregated = tf2_msgs::TFMessage::default(); let mut pubs = HashMap::new(); for channel in summary.channels.values() { @@ -95,6 +96,42 @@ async fn mcap_playback_init( log::warn!("{mcap_name} {}", channel.message_encoding); continue; } + + // ugly special case to handle tf static aggregation + // TODO(lucasw) this maybe be a little funny if the mcap contains + // multiple static transforms with the same parent and child, could + // scan through them and only take the most recent, or build a hash map here + if channel.topic == "/tf_static" { + // && channel.schema.unwrap().name == "tf2_msgs/TFMessage" { + // TODO(lucasw) how to get through an mcap as quickly as possible to get a single + // topic? The easiest thing would be to save tf_static to a separate mcap in the + // first place, more advanced would be for mcap_record to put all the statics + // in a separate chunk but then 'mcap convert' wouldn't do that. + for message_raw in mcap::MessageStream::new(mapped)? { + if let Ok(message) = message_raw { + if message.channel.topic != "/tf_static" { + continue; + } + let msg_with_header = misc::get_message_data_with_header(message.data); + match serde_rosmsg::from_slice::(&msg_with_header) { + Ok(tf_msg) => { + log::info!( + "{mcap_name} adding {} transforms to tf_static, total {}\r", + tf_msg.transforms.len(), + tf_msg.transforms.len() + tf_static_aggregated.transforms.len(), + ); + for transform in tf_msg.transforms { + tf_static_aggregated.transforms.push(transform); + } + } + Err(err) => { + log::error!("{mcap_name} {err:?}"); + } + } + } + } + } + if let Some(schema) = &channel.schema { if schema.encoding != "ros1msg" { // TODO(lucasw) warn on first occurrence @@ -138,7 +175,7 @@ async fn mcap_playback_init( } } // loop through channels - Ok((pubs, msg_t0, msg_t1)) + Ok((pubs, tf_static_aggregated, msg_t0, msg_t1)) } fn get_non_ros_cli_args( @@ -270,10 +307,11 @@ async fn main() -> Result<(), anyhow::Error> { }); let nh = roslibrust::ros1::NodeHandle::new(&master_uri, &full_node_name).await?; - let (msg_t_start, msg_t_end, play_data) = { + let (msg_t_start, tf_static_aggregated, msg_t_end, play_data) = { let mut msg_t_start = f64::MAX; let mut msg_t_end: f64 = 0.0; let mut play_data = Vec::new(); + let mut tf_static_aggregated = tf2_msgs::TFMessage::default(); for (ind, mcap_name) in mcap_names.iter().enumerate() { let nh_name = format!( @@ -290,10 +328,14 @@ async fn main() -> Result<(), anyhow::Error> { .await; match rv { - Ok((pubs, msg_t0, msg_t1)) => { + Ok((pubs, tf_statics, msg_t0, msg_t1)) => { msg_t_start = msg_t_start.min(msg_t0); msg_t_end = msg_t_end.max(msg_t1); play_data.push((mcap_name.clone(), mapped, pubs, msg_t0, msg_t1)); + + for transform in tf_statics.transforms { + tf_static_aggregated.transforms.push(transform); + } } Err(err) => { log::error!("{err:?}"); @@ -311,9 +353,27 @@ async fn main() -> Result<(), anyhow::Error> { if play_data.len() == 0 { panic!("no mcaps successfully loaded"); } - (msg_t_start, msg_t_end, play_data) + (msg_t_start, tf_static_aggregated, msg_t_end, play_data) }; + // publish all teh tf statics from all the mcaps together in one latched message + // unless we're excluding tf_static + if !tf_static_aggregated.transforms.is_empty() { + let topic = { + match remaps.get("/tf_static") { + Some(topic) => topic.to_string(), + None => "/tf_static".to_string(), + } + }; + let latching = true; + let static_publisher = nh + .advertise::(&topic, 10, latching) + .await?; + // Does having this go out of scope make it not work? + // No apparently not + static_publisher.publish(&tf_static_aggregated).await?; + } + { enable_raw_mode()?; @@ -508,12 +568,6 @@ async fn play( get_wall_time() - wall_t0 ); - // TODO(lucasw) make this generic somehow? In the case of tf_static and TFMessage we can - // aggregate all the static messages into one TFMessage, but what about other types of - // messages that are on static publishers? (Any use cases that make that more worthy - // of attention?) - let mut tf_static_aggregated = tf2_msgs::TFMessage::default(); - let mut count = 0; for message_raw in mcap::MessageStream::new(mapped)? { match message_raw { @@ -595,33 +649,7 @@ async fn play( break; } - // ugly special case to handle tf static aggregation - // TODO(lucasW) this maybe be a little funny if the mcap contains - // multiple static transforms with the same parent and child - if channel.topic == "/tf_static" { - // && channel.schema.unwrap().name == "tf2_msgs/TFMessage" { - match serde_rosmsg::from_slice::(&msg_with_header) - { - Ok(tf_msg) => { - log::info!( - "{loop_count} {mcap_name} adding {} transforms to tf_static, total{}\r", - tf_msg.transforms.len(), - tf_msg.transforms.len() + tf_static_aggregated.transforms.len(), - ); - for transform in tf_msg.transforms { - tf_static_aggregated.transforms.push(transform); - } - - let data = serde_rosmsg::to_vec(&tf_static_aggregated).unwrap(); - let _ = publisher.publish(&data).await; - } - Err(err) => { - log::error!("{loop_count} {mcap_name} {err:?}"); - } - } - } else { - let _ = publisher.publish(&msg_with_header).await; - } + let _ = publisher.publish(&msg_with_header).await; count += 1; log::debug!(