Skip to content

Commit

Permalink
Multithreaded rtabmap node (#1214)
Browse files Browse the repository at this point in the history
* Multi-threaded rtabmap node

* Moved data sync logic inside CoreWrapper to process odometry topics outside processing thread

* updated commend

* re-enabled jazzy

* disabled jazzy for this mr

* Added async callbackgroup for imu and landmark topics

* Making all shared variables thread-safe
  • Loading branch information
matlabbe authored Oct 16, 2024
1 parent 1950b95 commit ad97f56
Show file tree
Hide file tree
Showing 19 changed files with 772 additions and 524 deletions.
25 changes: 13 additions & 12 deletions rtabmap_conversions/src/MsgConversion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1691,18 +1691,6 @@ rtabmap::OdometryInfo odomInfoFromROS(const rtabmap_msgs::msg::OdomInfo & msg, b
info.localBundleOutliers = msg.local_bundle_outliers;
info.localBundleConstraints = msg.local_bundle_constraints;
info.localBundleTime = msg.local_bundle_time;
UASSERT(msg.local_bundle_models.size() == msg.local_bundle_ids.size());
UASSERT(msg.local_bundle_models.size() == msg.local_bundle_poses.size());
for(size_t i=0; i<msg.local_bundle_ids.size(); ++i)
{
std::vector<rtabmap::CameraModel> models;
for(size_t j=0; j<msg.local_bundle_models[i].models.size(); ++j)
{
models.push_back(cameraModelFromROS(msg.local_bundle_models[i].models[j].camera_info, transformFromGeometryMsg(msg.local_bundle_models[i].models[j].local_transform)));
}
info.localBundleModels.insert(std::make_pair(msg.local_bundle_ids[i], models));
info.localBundlePoses.insert(std::make_pair(msg.local_bundle_ids[i], transformFromPoseMsg(msg.local_bundle_poses[i])));
}
info.keyFrameAdded = msg.key_frame_added;
info.timeEstimation = msg.time_estimation;
info.timeParticleFiltering = msg.time_particle_filtering;
Expand All @@ -1720,6 +1708,19 @@ rtabmap::OdometryInfo odomInfoFromROS(const rtabmap_msgs::msg::OdomInfo & msg, b

if(!ignoreData)
{
UASSERT(msg.local_bundle_models.size() == msg.local_bundle_ids.size());
UASSERT(msg.local_bundle_models.size() == msg.local_bundle_poses.size());
for(size_t i=0; i<msg.local_bundle_ids.size(); ++i)
{
std::vector<rtabmap::CameraModel> models;
for(size_t j=0; j<msg.local_bundle_models[i].models.size(); ++j)
{
models.push_back(cameraModelFromROS(msg.local_bundle_models[i].models[j].camera_info, transformFromGeometryMsg(msg.local_bundle_models[i].models[j].local_transform)));
}
info.localBundleModels.insert(std::make_pair(msg.local_bundle_ids[i], models));
info.localBundlePoses.insert(std::make_pair(msg.local_bundle_ids[i], transformFromPoseMsg(msg.local_bundle_poses[i])));
}

UASSERT(msg.words_keys.size() == msg.words_values.size());
for(unsigned int i=0; i<msg.words_keys.size(); ++i)
{
Expand Down
42 changes: 38 additions & 4 deletions rtabmap_slam/include/rtabmap_slam/CoreWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ class CoreWrapper : public rclcpp::Node, public rtabmap_sync::CommonDataSubscrib

private:
bool odomUpdate(const nav_msgs::msg::Odometry & odomMsg, rclcpp::Time stamp);
bool odomTFUpdate(const rclcpp::Time & stamp); // TF odom
bool odomTFUpdate(const std::string & odomFrameId, const rclcpp::Time & stamp); // TF odom

// Callback called from sync thread
virtual void commonMultiCameraCallback(
const nav_msgs::msg::Odometry::ConstSharedPtr & odomMsg,
const rtabmap_msgs::msg::UserData::ConstSharedPtr & userDataMsg,
Expand All @@ -134,6 +135,7 @@ class CoreWrapper : public rclcpp::Node, public rtabmap_sync::CommonDataSubscrib
const std::vector<std::vector<rtabmap_msgs::msg::KeyPoint> > & localKeyPoints = std::vector<std::vector<rtabmap_msgs::msg::KeyPoint> >(),
const std::vector<std::vector<rtabmap_msgs::msg::Point3f> > & localPoints3d = std::vector<std::vector<rtabmap_msgs::msg::Point3f> >(),
const std::vector<cv::Mat> & localDescriptors = std::vector<cv::Mat>());
// Callback called from sync thread
void commonMultiCameraCallbackImpl(
const std::string & odomFrameId,
const rtabmap_msgs::msg::UserData::ConstSharedPtr & userDataMsg,
Expand All @@ -148,18 +150,21 @@ class CoreWrapper : public rclcpp::Node, public rtabmap_sync::CommonDataSubscrib
const std::vector<std::vector<rtabmap_msgs::msg::KeyPoint> > & localKeyPoints,
const std::vector<std::vector<rtabmap_msgs::msg::Point3f> > & localPoints3d,
const std::vector<cv::Mat> & localDescriptors);
// Callback called from sync thread
virtual void commonLaserScanCallback(
const nav_msgs::msg::Odometry::ConstSharedPtr & odomMsg,
const rtabmap_msgs::msg::UserData::ConstSharedPtr & userDataMsg,
const sensor_msgs::msg::LaserScan & scanMsg,
const sensor_msgs::msg::PointCloud2 & scan3dMsg,
const rtabmap_msgs::msg::OdomInfo::ConstSharedPtr& odomInfoMsg,
const rtabmap_msgs::msg::GlobalDescriptor & globalDescriptor = rtabmap_msgs::msg::GlobalDescriptor());
// Callback called from sync thread
virtual void commonOdomCallback(
const nav_msgs::msg::Odometry::ConstSharedPtr & odomMsg,
const rtabmap_msgs::msg::UserData::ConstSharedPtr & userDataMsg,
const rtabmap_msgs::msg::OdomInfo::ConstSharedPtr& odomInfoMsg);

// Callback called from sync thread
virtual void commonSensorDataCallback(
const rtabmap_msgs::msg::SensorData::ConstSharedPtr & sensorDataMsg,
const nav_msgs::msg::Odometry::ConstSharedPtr & odomMsg,
Expand Down Expand Up @@ -195,6 +200,8 @@ class CoreWrapper : public rclcpp::Node, public rtabmap_sync::CommonDataSubscrib
void goalNodeCallback(const rtabmap_msgs::msg::Goal::SharedPtr msg);
void updateGoal(const rclcpp::Time & stamp);

void processAsync();

void process(
const rclcpp::Time & stamp,
rtabmap::SensorData & data,
Expand Down Expand Up @@ -266,11 +273,14 @@ class CoreWrapper : public rclcpp::Node, public rtabmap_sync::CommonDataSubscrib
private:
rtabmap::Rtabmap rtabmap_;
bool paused_;

UMutex lastPoseMutex_;
rtabmap::Transform lastPose_;
rclcpp::Time lastPoseStamp_;
std::vector<float> lastPoseVelocity_;
cv::Mat lastPoseCovariance_;
bool lastPoseIntermediate_;
cv::Mat covariance_;

rtabmap::Transform currentMetricGoal_;
rtabmap::Transform lastPublishedMetricGoal_;
bool latestNodeWasReached_;
Expand Down Expand Up @@ -341,7 +351,7 @@ class CoreWrapper : public rclcpp::Node, public rtabmap_sync::CommonDataSubscrib
std::shared_ptr<tf2_ros::Buffer> tfBuffer_;
std::shared_ptr<tf2_ros::TransformListener> tfListener_;

rclcpp::SyncParametersClient::SharedPtr parametersClient_;
rclcpp::AsyncParametersClient::SharedPtr parametersClient_;
rclcpp::Subscription<rcl_interfaces::msg::ParameterEvent>::SharedPtr parameterEventSub_;

rclcpp::Service<std_srvs::srv::Empty>::SharedPtr updateSrv_;
Expand Down Expand Up @@ -390,6 +400,7 @@ class CoreWrapper : public rclcpp::Node, public rtabmap_sync::CommonDataSubscrib
// for loop closure detection only
image_transport::Subscriber defaultSub_;

rclcpp::CallbackGroup::SharedPtr userDataAsyncCallbackGroup_;
rclcpp::Subscription<rtabmap_msgs::msg::UserData>::SharedPtr userDataAsyncSub_;
cv::Mat userData_;
UMutex userDataMutex_;
Expand All @@ -398,6 +409,8 @@ class CoreWrapper : public rclcpp::Node, public rtabmap_sync::CommonDataSubscrib
geometry_msgs::msg::PoseWithCovarianceStamped globalPose_;
rclcpp::Subscription<sensor_msgs::msg::NavSatFix>::SharedPtr gpsFixAsyncSub_;
rtabmap::GPS gps_;

rclcpp::CallbackGroup::SharedPtr landmarkCallbackGroup_;
rclcpp::Subscription<rtabmap_msgs::msg::LandmarkDetection>::SharedPtr landmarkDetectionSub_;
rclcpp::Subscription<rtabmap_msgs::msg::LandmarkDetections>::SharedPtr landmarkDetectionsSub_;
#ifdef WITH_APRILTAG_MSGS
Expand All @@ -407,10 +420,14 @@ class CoreWrapper : public rclcpp::Node, public rtabmap_sync::CommonDataSubscrib
rclcpp::Subscription<fiducial_msgs::msg::FiducialTransformArray>::SharedPtr fiducialTransfromsSub_;
#endif
std::map<int, std::pair<geometry_msgs::msg::PoseWithCovarianceStamped, float> > landmarks_; // id, <pose, size>
rclcpp::Subscription<sensor_msgs::msg::Imu>::SharedPtr imuSub_;
UMutex landmarksMutex_;

rclcpp::CallbackGroup::SharedPtr imuCallbackGroup_;
rclcpp::Subscription<sensor_msgs::msg::Imu>::SharedPtr imuSub_;
std::map<double, rtabmap::Transform> imus_;
std::string imuFrameId_;
UMutex imuMutex_;

rclcpp::Subscription<std_msgs::msg::Int32MultiArray>::SharedPtr republishNodeDataSub_;

rclcpp::Subscription<nav_msgs::msg::Odometry>::SharedPtr interOdomSub_;
Expand Down Expand Up @@ -444,6 +461,23 @@ class CoreWrapper : public rclcpp::Node, public rtabmap_sync::CommonDataSubscrib
double localizationError_;
};
LocalizationStatusTask localizationDiagnostic_;

rclcpp::CallbackGroup::SharedPtr processingCallbackGroup_;
struct SyncData {
bool valid;
rclcpp::Time stamp;
rtabmap::SensorData data;
rtabmap::Transform odom;
std::vector<float> odomVelocity;
std::string odomFrameId;
cv::Mat odomCovariance;
rtabmap::OdometryInfo odomInfo;
double timeMsgConversion;
};
rclcpp::TimerBase::SharedPtr syncTimer_;
SyncData syncData_;
UMutex syncDataMutex_;
bool triggerNewMapBeforeNextUpdate_;
};

}
Expand Down
5 changes: 4 additions & 1 deletion rtabmap_slam/src/CoreNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ int main(int argc, char** argv)
rclcpp::init(argc, argv);
rclcpp::NodeOptions options;
options.arguments(arguments);
auto node = std::make_shared<rtabmap_slam::CoreWrapper>(options);
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
UINFO("rtabmap %s started...", RTABMAP_VERSION);
rclcpp::spin(std::make_shared<rtabmap_slam::CoreWrapper>(options));
executor.spin();
rclcpp::shutdown();
return 0;
}
Loading

0 comments on commit ad97f56

Please sign in to comment.