From 1710a324892d1ac250a69b254c0d2db2460487db Mon Sep 17 00:00:00 2001 From: Lucas Walter Date: Tue, 3 Sep 2024 06:34:14 -0700 Subject: [PATCH] added optional looping mcap playback --- mcap_tools/src/mcap_play.rs | 168 ++++++++++++++++++++++++------------ 1 file changed, 114 insertions(+), 54 deletions(-) diff --git a/mcap_tools/src/mcap_play.rs b/mcap_tools/src/mcap_play.rs index 324c072..aaf578f 100644 --- a/mcap_tools/src/mcap_play.rs +++ b/mcap_tools/src/mcap_play.rs @@ -8,6 +8,7 @@ use regex::Regex; use roslibrust::ros1::PublisherAny; use std::collections::HashMap; use std::time::SystemTime; +use tokio::sync::broadcast; fn f64_secs_to_local_datetime(secs: f64) -> DateTime { let d = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs_f64(secs); @@ -48,7 +49,7 @@ async fn mcap_playback_init( mapped: &Mmap, include_re: &Option, exclude_re: &Option, -) -> Result<(HashMap, f64), anyhow::Error> { +) -> Result<(HashMap, f64, f64), anyhow::Error> { let mut pubs = HashMap::new(); // TODO(lucasw) could create every publisher from the summary @@ -95,14 +96,14 @@ async fn mcap_playback_init( } } // loop through channels - let msg_t0 = summary + let stats = summary .stats - .ok_or(anyhow::anyhow!("{mcap_name} no stats"))? - .message_start_time as f64 - / 1e9; - log::info!("{mcap_name} start time {}", msg_t0); + .ok_or(anyhow::anyhow!("{mcap_name} no stats"))?; + let msg_t0 = stats.message_start_time as f64 / 1e9; + let msg_t1 = stats.message_end_time as f64 / 1e9; + log::info!("{mcap_name} start time {msg_t0}, end time {msg_t1}"); - Ok((pubs, msg_t0)) + Ok((pubs, msg_t0, msg_t1)) } #[tokio::main] @@ -122,6 +123,13 @@ async fn main() -> Result<(), anyhow::Error> { // non-ros cli args let matches = command!() + .arg( + arg!( + -l --loop "loop playback n time, set to zero to loop without end" + ) + .value_parser(clap::value_parser!(u64)) + .required(false) + ) .arg( arg!( -e --regex "match topics using regular expressions" @@ -142,6 +150,10 @@ async fn main() -> Result<(), anyhow::Error> { ) .get_matches_from(unused_args); + // convert Option(&u64) to Option(u64) with the copied() + let max_loops = matches.get_one::("loop").copied(); + log::info!("max loop {max_loops:?}"); + let include_re; if let Some(regex_str) = matches.get_one::("regex") { include_re = Some(Regex::new(regex_str)?); @@ -170,6 +182,9 @@ async fn main() -> Result<(), anyhow::Error> { let mcap_names: Vec = mcap_names.iter().map(|s| (**s).clone()).collect(); log::info!("{mcap_names:?}"); + let mut msg_t_start = f64::MAX; + let mut msg_t_end: f64 = 0.0; + let nh = roslibrust::ros1::NodeHandle::new(&master_uri, &full_node_name).await?; let play_data = { let mut play_data = Vec::new(); @@ -180,36 +195,64 @@ async fn main() -> Result<(), anyhow::Error> { mcap_name.replace(['/', '.'], "_") ); log::info!("{ind} opening '{mcap_name}' for playback with inner node {nh_name}"); + // TODO(lucasw) create multiple node handles per thread didn't work, they didn't publish properly // let nh = roslibrust::ros1::NodeHandle::new(&master_uri, &nh_name).await?; let mapped = misc::map_mcap(mcap_name)?; // initialize the start times and publishers - let (pubs, msg_t0) = + let (pubs, msg_t0, msg_t1) = mcap_playback_init(&nh, mcap_name, &mapped, &include_re, &exclude_re).await?; - play_data.push((mcap_name.clone(), mapped, /* nh,*/ pubs, msg_t0)); + msg_t_start = msg_t_start.min(msg_t0); + msg_t_end = msg_t_end.max(msg_t1); + + play_data.push((mcap_name.clone(), mapped, pubs, msg_t0, msg_t1)); } play_data }; { let mut handles = Vec::new(); - let wall_t0 = get_wall_time(); - for (mcap_name, mapped, /*_nh, */ pubs, msg_t0) in play_data { + // TODO(lucasw) also want to be able to pause playback + let (tx, mut _rx0) = broadcast::channel(4); + for (mcap_name, mapped, pubs, msg_t0, _msg_t1) in play_data { + // let msg_t_start = msg_t_start.clone(); + let rx = tx.subscribe(); let handle = tokio::spawn(async move { log::info!( - "first message time {msg_t0:.3}s ({:?}), wall time {wall_t0:.3}", + "first message time {msg_t0:.3}s ({:?})", f64_secs_to_local_datetime(msg_t0), ); - // TODO(lucasw) loop optionally - play(&mcap_name, &mapped, &pubs, msg_t0, wall_t0) + play(&mcap_name, &mapped, &pubs, msg_t_start, rx, max_loops) .await .unwrap(); }); handles.push(handle); } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + let mut loop_count = 0; + loop { + let wall_t0 = get_wall_time(); + log::info!("{loop_count} send wall start time to play threads {wall_t0:.3}"); + tx.send(wall_t0)?; + + let duration = msg_t_end - msg_t_start; + log::info!("{loop_count} waiting for {duration:.3}s for playback to end"); + tokio::time::sleep(tokio::time::Duration::from_secs((duration + 1.0) as u64)).await; + + loop_count += 1; + if let Some(max_loops) = max_loops { + if max_loops > 0 && loop_count >= max_loops { + break; + } + } else { + break; + } + } + log::info!("waiting for {} tokio handles", handles.len()); for handle in handles { handle.await?; @@ -225,53 +268,70 @@ async fn play( mapped: &Mmap, pubs: &HashMap, msg_t0: f64, - wall_t0: f64, + mut wall_t0_rx: broadcast::Receiver, + max_loops: Option, ) -> Result<(), anyhow::Error> { - // message timestamps of first message published, the message time and the wall clock time - - let mut count = 0; - for message_raw in mcap::MessageStream::new(mapped)? { - match message_raw { - Ok(message) => { - let channel = message.channel; - - // TODO(lucasw) unnecessary with pubs.get below? - if !pubs.contains_key(&channel.topic) { - continue; - } - - let msg_with_header = misc::get_message_data_with_header(message.data); - if let Some(publisher) = pubs.get(&channel.topic) { - let msg_time = message.log_time as f64 / 1e9; - let wall_time = get_wall_time(); - - let msg_elapsed = msg_time - msg_t0; - let wall_elapsed = wall_time - wall_t0; - // if dt < 0.0 then playback is lagging behind the wallclock - // need to play back messages as fast as possible without sleeping - // until caught up - let dt = msg_elapsed - wall_elapsed; - if dt > 0.0 { - tokio::time::sleep(tokio::time::Duration::from_millis( - (dt * 1000.0) as u64, - )) - .await; + let mut loop_count = 0; + loop { + // TODO(lucasw) if a new wall_t0 is received mid playback it should interrupt it + // and start over if needed, which means we need to try_recv on wall_t0_rx after every message + // publish and periodically while sleeping between messages + let wall_t0 = wall_t0_rx.recv().await?; + log::info!("{loop_count} {mcap_name} wall_t0 received {wall_t0}"); + + let mut count = 0; + for message_raw in mcap::MessageStream::new(mapped)? { + match message_raw { + Ok(message) => { + let channel = message.channel; + + // TODO(lucasw) unnecessary with pubs.get below? + if !pubs.contains_key(&channel.topic) { + continue; } - let _ = publisher.publish(&msg_with_header).await; - - count += 1; - log::debug!("{mcap_name} {count} {} publish", channel.topic); - // TODO(lucasw) publish a clock message + let msg_with_header = misc::get_message_data_with_header(message.data); + if let Some(publisher) = pubs.get(&channel.topic) { + let msg_time = message.log_time as f64 / 1e9; + let wall_time = get_wall_time(); + + let msg_elapsed = msg_time - msg_t0; + let wall_elapsed = wall_time - wall_t0; + // if dt < 0.0 then playback is lagging behind the wallclock + // need to play back messages as fast as possible without sleeping + // until caught up + let dt = msg_elapsed - wall_elapsed; + // TODO(lucasw) if dt is too negative then skip publishing until caught up? + if dt > 0.0 { + tokio::time::sleep(tokio::time::Duration::from_millis( + (dt * 1000.0) as u64, + )) + .await; + } + + let _ = publisher.publish(&msg_with_header).await; + + count += 1; + log::debug!("{loop_count} {mcap_name} {count} {} publish", channel.topic); + // TODO(lucasw) publish a clock message + } + } + Err(e) => { + log::warn!("{mcap_name} {:?}", e); } } - Err(e) => { - log::warn!("{mcap_name} {:?}", e); + } // loop through all messages + + log::info!("{loop_count} {mcap_name} published {count} messages"); + loop_count += 1; + if let Some(max_loops) = max_loops { + if max_loops > 0 && loop_count >= max_loops { + break; } + } else { + break; } - } // loop through all messages - - log::info!("{mcap_name} published {count} messages"); + } Ok(()) }