Skip to content

Commit

Permalink
aggregate tf static messages before normal playback, which eliminates…
Browse files Browse the repository at this point in the history
… the bug with starting playback after the statics were stamped
  • Loading branch information
lucasw committed Sep 15, 2024
1 parent 5f94bb8 commit ba8a9ea
Showing 1 changed file with 68 additions and 40 deletions.
108 changes: 68 additions & 40 deletions mcap_tools/src/mcap_play.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ fn use_topic(topic: &str, include_re: &Option<Regex>, exclude_re: &Option<Regex>
// TODO(lucasw) topic_type matching would be useful as well
if let Some(ref re) = include_re {
if re.captures(topic).is_none() {
log::debug!("not recording from {topic}");
log::debug!("not playing back {topic}");
return false;
}
}

if let Some(ref re) = exclude_re {
if re.captures(topic).is_some() {
log::info!("excluding recording from {topic}");
log::info!("excluding playing back {topic}");
return false;
}
}
Expand All @@ -62,7 +62,7 @@ async fn mcap_playback_init(
include_re: &Option<Regex>,
exclude_re: &Option<Regex>,
remaps: &HashMap<String, String>,
) -> Result<(HashMap<String, PublisherAny>, f64, f64), anyhow::Error> {
) -> Result<(HashMap<String, PublisherAny>, tf2_msgs::TFMessage, f64, f64), anyhow::Error> {
// TODO(lucasw) could create every publisher from the summary
let summary = mcap::read::Summary::read(mapped)?;
// let summary = summary.ok_or(Err(anyhow::anyhow!("no summary")))?;
Expand All @@ -83,6 +83,7 @@ async fn mcap_playback_init(
msg_t1 - msg_t0
);

let mut tf_static_aggregated = tf2_msgs::TFMessage::default();
let mut pubs = HashMap::new();

for channel in summary.channels.values() {
Expand All @@ -95,6 +96,42 @@ async fn mcap_playback_init(
log::warn!("{mcap_name} {}", channel.message_encoding);
continue;
}

// ugly special case to handle tf static aggregation
// TODO(lucasw) this maybe be a little funny if the mcap contains
// multiple static transforms with the same parent and child, could
// scan through them and only take the most recent, or build a hash map here
if channel.topic == "/tf_static" {
// && channel.schema.unwrap().name == "tf2_msgs/TFMessage" {
// TODO(lucasw) how to get through an mcap as quickly as possible to get a single
// topic? The easiest thing would be to save tf_static to a separate mcap in the
// first place, more advanced would be for mcap_record to put all the statics
// in a separate chunk but then 'mcap convert' wouldn't do that.
for message_raw in mcap::MessageStream::new(mapped)? {
if let Ok(message) = message_raw {
if message.channel.topic != "/tf_static" {
continue;
}
let msg_with_header = misc::get_message_data_with_header(message.data);
match serde_rosmsg::from_slice::<tf2_msgs::TFMessage>(&msg_with_header) {
Ok(tf_msg) => {
log::info!(
"{mcap_name} adding {} transforms to tf_static, total {}\r",
tf_msg.transforms.len(),
tf_msg.transforms.len() + tf_static_aggregated.transforms.len(),
);
for transform in tf_msg.transforms {
tf_static_aggregated.transforms.push(transform);
}
}
Err(err) => {
log::error!("{mcap_name} {err:?}");
}
}
}
}
}

if let Some(schema) = &channel.schema {
if schema.encoding != "ros1msg" {
// TODO(lucasw) warn on first occurrence
Expand Down Expand Up @@ -138,7 +175,7 @@ async fn mcap_playback_init(
}
} // loop through channels

Ok((pubs, msg_t0, msg_t1))
Ok((pubs, tf_static_aggregated, msg_t0, msg_t1))
}

fn get_non_ros_cli_args(
Expand Down Expand Up @@ -270,10 +307,11 @@ async fn main() -> Result<(), anyhow::Error> {
});

let nh = roslibrust::ros1::NodeHandle::new(&master_uri, &full_node_name).await?;
let (msg_t_start, msg_t_end, play_data) = {
let (msg_t_start, tf_static_aggregated, msg_t_end, play_data) = {
let mut msg_t_start = f64::MAX;
let mut msg_t_end: f64 = 0.0;
let mut play_data = Vec::new();
let mut tf_static_aggregated = tf2_msgs::TFMessage::default();

for (ind, mcap_name) in mcap_names.iter().enumerate() {
let nh_name = format!(
Expand All @@ -290,10 +328,14 @@ async fn main() -> Result<(), anyhow::Error> {
.await;

match rv {
Ok((pubs, msg_t0, msg_t1)) => {
Ok((pubs, tf_statics, msg_t0, msg_t1)) => {
msg_t_start = msg_t_start.min(msg_t0);
msg_t_end = msg_t_end.max(msg_t1);
play_data.push((mcap_name.clone(), mapped, pubs, msg_t0, msg_t1));

for transform in tf_statics.transforms {
tf_static_aggregated.transforms.push(transform);
}
}
Err(err) => {
log::error!("{err:?}");
Expand All @@ -311,9 +353,27 @@ async fn main() -> Result<(), anyhow::Error> {
if play_data.len() == 0 {
panic!("no mcaps successfully loaded");
}
(msg_t_start, msg_t_end, play_data)
(msg_t_start, tf_static_aggregated, msg_t_end, play_data)
};

// publish all teh tf statics from all the mcaps together in one latched message
// unless we're excluding tf_static
if !tf_static_aggregated.transforms.is_empty() {
let topic = {
match remaps.get("/tf_static") {
Some(topic) => topic.to_string(),
None => "/tf_static".to_string(),
}
};
let latching = true;
let static_publisher = nh
.advertise::<tf2_msgs::TFMessage>(&topic, 10, latching)
.await?;
// Does having this go out of scope make it not work?
// No apparently not
static_publisher.publish(&tf_static_aggregated).await?;
}

{
enable_raw_mode()?;

Expand Down Expand Up @@ -508,12 +568,6 @@ async fn play(
get_wall_time() - wall_t0
);

// TODO(lucasw) make this generic somehow? In the case of tf_static and TFMessage we can
// aggregate all the static messages into one TFMessage, but what about other types of
// messages that are on static publishers? (Any use cases that make that more worthy
// of attention?)
let mut tf_static_aggregated = tf2_msgs::TFMessage::default();

let mut count = 0;
for message_raw in mcap::MessageStream::new(mapped)? {
match message_raw {
Expand Down Expand Up @@ -595,33 +649,7 @@ async fn play(
break;
}

// ugly special case to handle tf static aggregation
// TODO(lucasW) this maybe be a little funny if the mcap contains
// multiple static transforms with the same parent and child
if channel.topic == "/tf_static" {
// && channel.schema.unwrap().name == "tf2_msgs/TFMessage" {
match serde_rosmsg::from_slice::<tf2_msgs::TFMessage>(&msg_with_header)
{
Ok(tf_msg) => {
log::info!(
"{loop_count} {mcap_name} adding {} transforms to tf_static, total{}\r",
tf_msg.transforms.len(),
tf_msg.transforms.len() + tf_static_aggregated.transforms.len(),
);
for transform in tf_msg.transforms {
tf_static_aggregated.transforms.push(transform);
}

let data = serde_rosmsg::to_vec(&tf_static_aggregated).unwrap();
let _ = publisher.publish(&data).await;
}
Err(err) => {
log::error!("{loop_count} {mcap_name} {err:?}");
}
}
} else {
let _ = publisher.publish(&msg_with_header).await;
}
let _ = publisher.publish(&msg_with_header).await;

count += 1;
log::debug!(
Expand Down

0 comments on commit ba8a9ea

Please sign in to comment.