Skip to content

Commit

Permalink
use new roslibrust get_connection_header to make a connection summary…
Browse files Browse the repository at this point in the history
… of the same style that mcap convert generates, but only one channel per topic currently instead of per-publisher
  • Loading branch information
lucasw committed Sep 9, 2024
1 parent 8b46a7b commit 589c733
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 24 deletions.
2 changes: 1 addition & 1 deletion mcap_tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ crossterm = { version = "0.28.1", features = ["event-stream"] }
futures = "0.3.30"
futures-timer = "3.0.3"
log = "0.4.22"
log-once = "0.4.1"
mcap = "0.9.1"
memmap = "0.7.0"
# rand = "0.8.5"
Expand Down Expand Up @@ -41,7 +42,6 @@ branch = "subscribe_any"
git = "https://github.com/lucasw/roslibrust"
branch = "subscribe_any"
# path = "../../roslibrust/roslibrust_codegen_macro"
# path = "../../../../../other/src/rust/roslibrust/roslibrust_codegen_macro"
# version="0.10"

[[bin]]
Expand Down
99 changes: 76 additions & 23 deletions mcap_tools/src/mcap_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async fn mcap_record(
full_node_name: String,
prefix: String,
channel_receiver: mpsc::Receiver<(String, String, String)>,
connection_summaries: Arc<Mutex<HashMap<String, BTreeMap<String, String>>>>,
msg_receiver: mpsc::Receiver<(String, String, u64, u32, Vec<u8>)>,
finish: Arc<AtomicBool>,
num_received_messages: Arc<AtomicUsize>,
Expand Down Expand Up @@ -177,11 +178,31 @@ async fn mcap_record(

let schema = schemas.lock().unwrap().get(&topic_type).unwrap().clone();

// TODO(lucasw) use Entry
let connection_summary = {
match connection_summaries.lock().unwrap().get(&topic) {
Some(connection_summary) => {
log_once::debug_once!("{full_node_name} '{topic}' have summary {connection_summary:?}");
connection_summary.clone()
}
None => {
log_once::warn_once!("{full_node_name} '{topic}' no connection summary for '{topic_type}' yet, leaving empty");
BTreeMap::<String, String>::default()
}
}
};

// TODO(lucasw) need to create a BTreeMap and at least store the
// md5sum and topic (though that's redundant with the channel topic)
// in it to match what is in bags converted to mcap
// by `mcap convert`, and if latching and callerid are available
// put those in metadata also.

let channel = mcap::Channel {
topic: topic.clone(),
schema,
message_encoding: "ros1".to_string(),
metadata: BTreeMap::default(),
metadata: connection_summary,
};

let channel_id =
Expand Down Expand Up @@ -329,6 +350,8 @@ async fn main() -> Result<(), anyhow::Error> {
let (channel_sender, channel_receiver) = mpsc::sync_channel(800);
let (msg_sender, msg_receiver) = mpsc::sync_channel(24000);

let connection_summaries = Arc::new(Mutex::new(HashMap::new()));

// signal to stop recording- TODO(lucasw) should be a sync_channel?
let finish = Arc::new(AtomicBool::new(false));

Expand Down Expand Up @@ -364,6 +387,7 @@ async fn main() -> Result<(), anyhow::Error> {
full_node_name,
prefix.to_string(),
channel_receiver,
connection_summaries.clone(),
msg_receiver,
finish.clone(),
num_received_messages.clone(),
Expand Down Expand Up @@ -423,6 +447,7 @@ async fn main() -> Result<(), anyhow::Error> {
let msg_sender = msg_sender.clone();
let channel_sender = channel_sender.clone();
let num_received_messages = num_received_messages.clone();
let connection_summaries = connection_summaries.clone();

// let num_topics = topics.len();

Expand All @@ -446,30 +471,58 @@ async fn main() -> Result<(), anyhow::Error> {
Ok(data) => {
if channel_data.is_none() {
log::debug!("get definition");
let definition = nh.inner.get_definition(topic.clone()).await;
match definition {
Ok(definition) => {
log::debug!("definition: '{definition}'");
let channel_data_inner =
(topic.clone(), topic_type.clone(), definition);
let send_rv =
channel_sender.send(channel_data_inner.clone());
match send_rv {
Ok(()) => {
channel_data = Some(());
// give some time for the receiver to process the
// new channel before sending a message below
tokio::time::sleep(Duration::from_millis(200))
.await;
}
Err(e) => {
log::error!("{channel_data_inner:?} {arrival_ns_epoch} {e:?}");
continue;
}
}
// TODO(lucasw) combine these calls, put the definition in the
// summary?
let connection_header =
nh.inner.get_connection_header(topic.clone()).await.unwrap();
let definition = connection_header.msg_definition;
log::debug!("definition: '{definition}'");

let connection_summary = {
let mut connection_summary = BTreeMap::new();
// TODO(lucasw) which caller_id is this if there are multiple publishers
// on this topic?
connection_summary.insert(
"caller_id".to_string(),
connection_header.caller_id,
);
connection_summary.insert(
"latching".to_string(),
(connection_header.latching as u8).to_string(),
);
if let Some(md5sum) = connection_header.md5sum {
connection_summary.insert("md5sum".to_string(), md5sum);
}
// TODO(lucasw) not sure why this is stored here, but `mcap convert`
// puts it in mcap channel metadata
if let Some(topic) = connection_header.topic {
connection_summary.insert("topic".to_string(), topic);
}
connection_summary
};

log::info!(
"'{topic}' connection summary: '{connection_summary:?}'"
);
connection_summaries
.lock()
.unwrap()
.insert(topic.clone(), connection_summary);

let channel_data_inner =
(topic.clone(), topic_type.clone(), definition);
let send_rv = channel_sender.send(channel_data_inner.clone());
match send_rv {
Ok(()) => {
channel_data = Some(());
// give some time for the receiver to process the
// new channel before sending a message below
tokio::time::sleep(Duration::from_millis(200)).await;
}
Err(e) => {
log::error!("get definition error {e:?}");
log::error!(
"{channel_data_inner:?} {arrival_ns_epoch} {e:?}"
);
continue;
}
}
Expand Down

0 comments on commit 589c733

Please sign in to comment.