Skip to content

Commit

Permalink
added optional looping mcap playback
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasw committed Sep 3, 2024
1 parent 62ba170 commit 1710a32
Showing 1 changed file with 114 additions and 54 deletions.
168 changes: 114 additions & 54 deletions mcap_tools/src/mcap_play.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use regex::Regex;
use roslibrust::ros1::PublisherAny;
use std::collections::HashMap;
use std::time::SystemTime;
use tokio::sync::broadcast;

fn f64_secs_to_local_datetime(secs: f64) -> DateTime<chrono::prelude::Local> {
let d = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs_f64(secs);
Expand Down Expand Up @@ -48,7 +49,7 @@ async fn mcap_playback_init(
mapped: &Mmap,
include_re: &Option<Regex>,
exclude_re: &Option<Regex>,
) -> Result<(HashMap<String, PublisherAny>, f64), anyhow::Error> {
) -> Result<(HashMap<String, PublisherAny>, f64, f64), anyhow::Error> {
let mut pubs = HashMap::new();

// TODO(lucasw) could create every publisher from the summary
Expand Down Expand Up @@ -95,14 +96,14 @@ async fn mcap_playback_init(
}
} // loop through channels

let msg_t0 = summary
let stats = summary
.stats
.ok_or(anyhow::anyhow!("{mcap_name} no stats"))?
.message_start_time as f64
/ 1e9;
log::info!("{mcap_name} start time {}", msg_t0);
.ok_or(anyhow::anyhow!("{mcap_name} no stats"))?;
let msg_t0 = stats.message_start_time as f64 / 1e9;
let msg_t1 = stats.message_end_time as f64 / 1e9;
log::info!("{mcap_name} start time {msg_t0}, end time {msg_t1}");

Ok((pubs, msg_t0))
Ok((pubs, msg_t0, msg_t1))
}

#[tokio::main]
Expand All @@ -122,6 +123,13 @@ async fn main() -> Result<(), anyhow::Error> {

// non-ros cli args
let matches = command!()
.arg(
arg!(
-l --loop <LOOP> "loop playback n time, set to zero to loop without end"
)
.value_parser(clap::value_parser!(u64))
.required(false)
)
.arg(
arg!(
-e --regex <REGEX> "match topics using regular expressions"
Expand All @@ -142,6 +150,10 @@ async fn main() -> Result<(), anyhow::Error> {
)
.get_matches_from(unused_args);

// convert Option(&u64) to Option(u64) with the copied()
let max_loops = matches.get_one::<u64>("loop").copied();
log::info!("max loop {max_loops:?}");

let include_re;
if let Some(regex_str) = matches.get_one::<String>("regex") {
include_re = Some(Regex::new(regex_str)?);
Expand Down Expand Up @@ -170,6 +182,9 @@ async fn main() -> Result<(), anyhow::Error> {
let mcap_names: Vec<String> = mcap_names.iter().map(|s| (**s).clone()).collect();
log::info!("{mcap_names:?}");

let mut msg_t_start = f64::MAX;
let mut msg_t_end: f64 = 0.0;

let nh = roslibrust::ros1::NodeHandle::new(&master_uri, &full_node_name).await?;
let play_data = {
let mut play_data = Vec::new();
Expand All @@ -180,36 +195,64 @@ async fn main() -> Result<(), anyhow::Error> {
mcap_name.replace(['/', '.'], "_")
);
log::info!("{ind} opening '{mcap_name}' for playback with inner node {nh_name}");
// TODO(lucasw) create multiple node handles per thread didn't work, they didn't publish properly
// let nh = roslibrust::ros1::NodeHandle::new(&master_uri, &nh_name).await?;
let mapped = misc::map_mcap(mcap_name)?;

// initialize the start times and publishers
let (pubs, msg_t0) =
let (pubs, msg_t0, msg_t1) =
mcap_playback_init(&nh, mcap_name, &mapped, &include_re, &exclude_re).await?;

play_data.push((mcap_name.clone(), mapped, /* nh,*/ pubs, msg_t0));
msg_t_start = msg_t_start.min(msg_t0);
msg_t_end = msg_t_end.max(msg_t1);

play_data.push((mcap_name.clone(), mapped, pubs, msg_t0, msg_t1));
}
play_data
};

{
let mut handles = Vec::new();
let wall_t0 = get_wall_time();
for (mcap_name, mapped, /*_nh, */ pubs, msg_t0) in play_data {
// TODO(lucasw) also want to be able to pause playback
let (tx, mut _rx0) = broadcast::channel(4);
for (mcap_name, mapped, pubs, msg_t0, _msg_t1) in play_data {
// let msg_t_start = msg_t_start.clone();
let rx = tx.subscribe();
let handle = tokio::spawn(async move {
log::info!(
"first message time {msg_t0:.3}s ({:?}), wall time {wall_t0:.3}",
"first message time {msg_t0:.3}s ({:?})",
f64_secs_to_local_datetime(msg_t0),
);

// TODO(lucasw) loop optionally
play(&mcap_name, &mapped, &pubs, msg_t0, wall_t0)
play(&mcap_name, &mapped, &pubs, msg_t_start, rx, max_loops)
.await
.unwrap();
});
handles.push(handle);
}

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let mut loop_count = 0;
loop {
let wall_t0 = get_wall_time();
log::info!("{loop_count} send wall start time to play threads {wall_t0:.3}");
tx.send(wall_t0)?;

let duration = msg_t_end - msg_t_start;
log::info!("{loop_count} waiting for {duration:.3}s for playback to end");
tokio::time::sleep(tokio::time::Duration::from_secs((duration + 1.0) as u64)).await;

loop_count += 1;
if let Some(max_loops) = max_loops {
if max_loops > 0 && loop_count >= max_loops {
break;
}
} else {
break;
}
}

log::info!("waiting for {} tokio handles", handles.len());
for handle in handles {
handle.await?;
Expand All @@ -225,53 +268,70 @@ async fn play(
mapped: &Mmap,
pubs: &HashMap<String, PublisherAny>,
msg_t0: f64,
wall_t0: f64,
mut wall_t0_rx: broadcast::Receiver<f64>,
max_loops: Option<u64>,
) -> Result<(), anyhow::Error> {
// message timestamps of first message published, the message time and the wall clock time

let mut count = 0;
for message_raw in mcap::MessageStream::new(mapped)? {
match message_raw {
Ok(message) => {
let channel = message.channel;

// TODO(lucasw) unnecessary with pubs.get below?
if !pubs.contains_key(&channel.topic) {
continue;
}

let msg_with_header = misc::get_message_data_with_header(message.data);
if let Some(publisher) = pubs.get(&channel.topic) {
let msg_time = message.log_time as f64 / 1e9;
let wall_time = get_wall_time();

let msg_elapsed = msg_time - msg_t0;
let wall_elapsed = wall_time - wall_t0;
// if dt < 0.0 then playback is lagging behind the wallclock
// need to play back messages as fast as possible without sleeping
// until caught up
let dt = msg_elapsed - wall_elapsed;
if dt > 0.0 {
tokio::time::sleep(tokio::time::Duration::from_millis(
(dt * 1000.0) as u64,
))
.await;
let mut loop_count = 0;
loop {
// TODO(lucasw) if a new wall_t0 is received mid playback it should interrupt it
// and start over if needed, which means we need to try_recv on wall_t0_rx after every message
// publish and periodically while sleeping between messages
let wall_t0 = wall_t0_rx.recv().await?;
log::info!("{loop_count} {mcap_name} wall_t0 received {wall_t0}");

let mut count = 0;
for message_raw in mcap::MessageStream::new(mapped)? {
match message_raw {
Ok(message) => {
let channel = message.channel;

// TODO(lucasw) unnecessary with pubs.get below?
if !pubs.contains_key(&channel.topic) {
continue;
}

let _ = publisher.publish(&msg_with_header).await;

count += 1;
log::debug!("{mcap_name} {count} {} publish", channel.topic);
// TODO(lucasw) publish a clock message
let msg_with_header = misc::get_message_data_with_header(message.data);
if let Some(publisher) = pubs.get(&channel.topic) {
let msg_time = message.log_time as f64 / 1e9;
let wall_time = get_wall_time();

let msg_elapsed = msg_time - msg_t0;
let wall_elapsed = wall_time - wall_t0;
// if dt < 0.0 then playback is lagging behind the wallclock
// need to play back messages as fast as possible without sleeping
// until caught up
let dt = msg_elapsed - wall_elapsed;
// TODO(lucasw) if dt is too negative then skip publishing until caught up?
if dt > 0.0 {
tokio::time::sleep(tokio::time::Duration::from_millis(
(dt * 1000.0) as u64,
))
.await;
}

let _ = publisher.publish(&msg_with_header).await;

count += 1;
log::debug!("{loop_count} {mcap_name} {count} {} publish", channel.topic);
// TODO(lucasw) publish a clock message
}
}
Err(e) => {
log::warn!("{mcap_name} {:?}", e);
}
}
Err(e) => {
log::warn!("{mcap_name} {:?}", e);
} // loop through all messages

log::info!("{loop_count} {mcap_name} published {count} messages");
loop_count += 1;
if let Some(max_loops) = max_loops {
if max_loops > 0 && loop_count >= max_loops {
break;
}
} else {
break;
}
} // loop through all messages

log::info!("{mcap_name} published {count} messages");
}

Ok(())
}

0 comments on commit 1710a32

Please sign in to comment.