Skip to content

Commit

Permalink
add a service to stop recording and exit
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasw committed Sep 4, 2024
1 parent 1710a32 commit 2f7aa78
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ubuntu_22_04.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ jobs:
- name: more apt installs
run: |
sudo apt install -yqq python3-rosmsg rospack-tools libsensor-msgs-dev libstd-msgs-dev libgeometry-msgs-dev ros-sensor-msgs ros-std-msgs ros-geometry-msgs cargo pkg-config
sudo apt install -yqq libstd-srvs-dev python3-std-srvs ros-std-srvs
sudo apt install -yqq librust-openssl-dev
rospack find geometry_msgs
- name:
run: |
# TODO(lucasw) build docs, and run tests
cd mcap_tools/mcap_tools
ROS_PACKAGE_PATH=`rospack find std_msgs`:`rospack find sensor_msgs`:`rospack find geometry_msgs` cargo build --release
ROS_PACKAGE_PATH=`rospack find std_msgs`:`rospack find sensor_msgs`:`rospack find geometry_msgs`:`rospack find std_srvs` cargo build --release
target/release/mcap_record --version
target/release/mcap_record --help
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ data_2024_08_26_09_15_08_-07_00_00001.mcap
## build

```
ROS_PACKAGE_PATH=`rospack find std_msgs`:`rospack find sensor_msgs`:`rospack find geometry_msgs` cargo build --release
ROS_PACKAGE_PATH=`rospack find std_msgs`:`rospack find sensor_msgs`:`rospack find geometry_msgs`:`rospack find std_srvs` cargo build --release
```

### build and run in one line (for development)
Expand Down
38 changes: 36 additions & 2 deletions mcap_tools/src/mcap_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use std::time::SystemTime;
use std::{collections::BTreeMap, fs, io::BufWriter};
use tokio::time::Duration;

use crate::misc::std_srvs;

fn rename_active(mcap_name: &str) -> std::io::Result<()> {
// TODO(lucasw) only replace last occurence
let inactive_name = mcap_name.replace(".mcap.active", ".mcap");
Expand All @@ -25,12 +27,12 @@ async fn mcap_record(
prefix: String,
channel_receiver: mpsc::Receiver<(String, String, String)>,
msg_receiver: mpsc::Receiver<(String, String, u64, u32, Vec<u8>)>,
finish: Arc<AtomicBool>,
num_received_messages: Arc<AtomicUsize>,
size_limit: u64,
) -> Result<(), anyhow::Error> {
// schemas should be able to persist across mcap file boundaries
let schemas = Arc::new(Mutex::new(HashMap::new()));
let finish = Arc::new(AtomicBool::new(false));

// Setup a task to kill this process when ctrl_c comes in:
{
Expand Down Expand Up @@ -104,7 +106,7 @@ async fn mcap_record(
let file_handle_metadata = file_handle.metadata();
if let Ok(file_handle_metadata) = file_handle_metadata {
let size_mb = file_handle_metadata.len() as f64 / (1024.0 * 1024.0);
if count % 2000 == 0 {
if count % 2000 == 1 {
log::info!("size {} / {size_limit} MB", size_mb as u64);
}
if size_mb > size_limit as f64 {
Expand Down Expand Up @@ -327,11 +329,43 @@ async fn main() -> Result<(), anyhow::Error> {
let (channel_sender, channel_receiver) = mpsc::sync_channel(800);
let (msg_sender, msg_receiver) = mpsc::sync_channel(24000);

// signal to stop recording- TODO(lucasw) should be a sync_channel?
let finish = Arc::new(AtomicBool::new(false));

// service that can shut down recording
// TODO(lucasw) later have this start or stop recording, and optionally the node can start
let _stop_server_handle;
{
let finish = finish.clone();
let full_node_name = full_node_name.clone();
let server_fn = move |request: std_srvs::SetBoolRequest| -> Result<
std_srvs::SetBoolResponse,
Box<dyn std::error::Error + Send + Sync>,
> {
let mut text = format!("{full_node_name} set recording to: {request:?}");
log::info!("{text}");
if !request.data {
let warn_text = "- stop request, going to finish and exit".to_string();
log::warn!("{warn_text}");
text += &warn_text;
finish.store(true, Ordering::SeqCst);
}
Ok(std_srvs::SetBoolResponse {
success: true,
message: text.to_string(),
})
};
_stop_server_handle = nh
.advertise_service::<std_srvs::SetBool, _>("~/set_recording", server_fn)
.await?;
};

let _ = mcap_record(
full_node_name,
prefix.to_string(),
channel_receiver,
msg_receiver,
finish.clone(),
num_received_messages.clone(),
size_limit,
)
Expand Down

0 comments on commit 2f7aa78

Please sign in to comment.