diff --git a/mcap_tools/Cargo.toml b/mcap_tools/Cargo.toml index 26c2ceb..b918739 100644 --- a/mcap_tools/Cargo.toml +++ b/mcap_tools/Cargo.toml @@ -14,6 +14,7 @@ crossterm = { version = "0.28.1", features = ["event-stream"] } futures = "0.3.30" futures-timer = "3.0.3" log = "0.4.22" +log-once = "0.4.1" mcap = "0.9.1" memmap = "0.7.0" # rand = "0.8.5" @@ -41,7 +42,6 @@ branch = "subscribe_any" git = "https://github.com/lucasw/roslibrust" branch = "subscribe_any" # path = "../../roslibrust/roslibrust_codegen_macro" -# path = "../../../../../other/src/rust/roslibrust/roslibrust_codegen_macro" # version="0.10" [[bin]] diff --git a/mcap_tools/src/mcap_record.rs b/mcap_tools/src/mcap_record.rs index 994627e..abb6bf6 100644 --- a/mcap_tools/src/mcap_record.rs +++ b/mcap_tools/src/mcap_record.rs @@ -26,6 +26,7 @@ async fn mcap_record( full_node_name: String, prefix: String, channel_receiver: mpsc::Receiver<(String, String, String)>, + connection_summaries: Arc>>>, msg_receiver: mpsc::Receiver<(String, String, u64, u32, Vec)>, finish: Arc, num_received_messages: Arc, @@ -177,11 +178,31 @@ async fn mcap_record( let schema = schemas.lock().unwrap().get(&topic_type).unwrap().clone(); + // TODO(lucasw) use Entry + let connection_summary = { + match connection_summaries.lock().unwrap().get(&topic) { + Some(connection_summary) => { + log_once::debug_once!("{full_node_name} '{topic}' have summary {connection_summary:?}"); + connection_summary.clone() + } + None => { + log_once::warn_once!("{full_node_name} '{topic}' no connection summary for '{topic_type}' yet, leaving empty"); + BTreeMap::::default() + } + } + }; + + // TODO(lucasw) need to create a BTreeMap and at least store the + // md5sum and topic (though that's redundant with the channel topic) + // in it to match what is in bags converted to mcap + // by `mcap convert`, and if latching and callerid are available + // put those in metadata also. + let channel = mcap::Channel { topic: topic.clone(), schema, message_encoding: "ros1".to_string(), - metadata: BTreeMap::default(), + metadata: connection_summary, }; let channel_id = @@ -329,6 +350,8 @@ async fn main() -> Result<(), anyhow::Error> { let (channel_sender, channel_receiver) = mpsc::sync_channel(800); let (msg_sender, msg_receiver) = mpsc::sync_channel(24000); + let connection_summaries = Arc::new(Mutex::new(HashMap::new())); + // signal to stop recording- TODO(lucasw) should be a sync_channel? let finish = Arc::new(AtomicBool::new(false)); @@ -364,6 +387,7 @@ async fn main() -> Result<(), anyhow::Error> { full_node_name, prefix.to_string(), channel_receiver, + connection_summaries.clone(), msg_receiver, finish.clone(), num_received_messages.clone(), @@ -423,6 +447,7 @@ async fn main() -> Result<(), anyhow::Error> { let msg_sender = msg_sender.clone(); let channel_sender = channel_sender.clone(); let num_received_messages = num_received_messages.clone(); + let connection_summaries = connection_summaries.clone(); // let num_topics = topics.len(); @@ -446,30 +471,58 @@ async fn main() -> Result<(), anyhow::Error> { Ok(data) => { if channel_data.is_none() { log::debug!("get definition"); - let definition = nh.inner.get_definition(topic.clone()).await; - match definition { - Ok(definition) => { - log::debug!("definition: '{definition}'"); - let channel_data_inner = - (topic.clone(), topic_type.clone(), definition); - let send_rv = - channel_sender.send(channel_data_inner.clone()); - match send_rv { - Ok(()) => { - channel_data = Some(()); - // give some time for the receiver to process the - // new channel before sending a message below - tokio::time::sleep(Duration::from_millis(200)) - .await; - } - Err(e) => { - log::error!("{channel_data_inner:?} {arrival_ns_epoch} {e:?}"); - continue; - } - } + // TODO(lucasw) combine these calls, put the definition in the + // summary? + let connection_header = + nh.inner.get_connection_header(topic.clone()).await.unwrap(); + let definition = connection_header.msg_definition; + log::debug!("definition: '{definition}'"); + + let connection_summary = { + let mut connection_summary = BTreeMap::new(); + // TODO(lucasw) which caller_id is this if there are multiple publishers + // on this topic? + connection_summary.insert( + "caller_id".to_string(), + connection_header.caller_id, + ); + connection_summary.insert( + "latching".to_string(), + (connection_header.latching as u8).to_string(), + ); + if let Some(md5sum) = connection_header.md5sum { + connection_summary.insert("md5sum".to_string(), md5sum); + } + // TODO(lucasw) not sure why this is stored here, but `mcap convert` + // puts it in mcap channel metadata + if let Some(topic) = connection_header.topic { + connection_summary.insert("topic".to_string(), topic); + } + connection_summary + }; + + log::info!( + "'{topic}' connection summary: '{connection_summary:?}'" + ); + connection_summaries + .lock() + .unwrap() + .insert(topic.clone(), connection_summary); + + let channel_data_inner = + (topic.clone(), topic_type.clone(), definition); + let send_rv = channel_sender.send(channel_data_inner.clone()); + match send_rv { + Ok(()) => { + channel_data = Some(()); + // give some time for the receiver to process the + // new channel before sending a message below + tokio::time::sleep(Duration::from_millis(200)).await; } Err(e) => { - log::error!("get definition error {e:?}"); + log::error!( + "{channel_data_inner:?} {arrival_ns_epoch} {e:?}" + ); continue; } }