Skip to content

Commit

Permalink
Callback after cancel (#2281)
Browse files Browse the repository at this point in the history
* feat(Client): Added function to stop callbacks of a goal handle

This function allows us to drop the handle in a locked context.
If we do not do this within a lock, there will be a race condition between
the deletion of the shared_ptr of the handle and the result / feedback
callbacks.

* fix: make Client goal handle recursive

This fixes deadlocks due to release of goal handles in callbacks etc.

* fix(ActionGoalClient): Fixed memory leak for nominal case

This fixes a memory leak due to a self reference in the ClientGoalHandle.
Note, this fix will only work, if the ClientGoalHandle ever receives
a result callback.

* doc: Updated documentation of rclcpp_action::Client::async_send_goal
* docs: Made the async_send_goal documentation more explicit

Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com>
Co-authored-by: Janosch Machowinski <J.Machowinski@cellumation.com>
  • Loading branch information
jmachowinski and Janosch Machowinski authored Apr 15, 2024
1 parent 90e4511 commit 839348c
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 13 deletions.
93 changes: 80 additions & 13 deletions rclcpp_action/include/rclcpp_action/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,22 @@ class Client : public ClientBase

/// Send an action goal and asynchronously get the result.
/**
* If the goal is accepted by an action server, the returned future is set to a `ClientGoalHandle`.
* If the goal is accepted by an action server, the returned future is set to a `GoalHandle::SharedPtr`.
* If the goal is rejected by an action server, then the future is set to a `nullptr`.
*
* The returned goal handle is used to monitor the status of the goal and get the final result.
* It is valid as long as you hold a reference to the shared pointer or until the
* rclcpp_action::Client is destroyed at which point the goal status will become UNKNOWN.
* The goal handle in the future is used to monitor the status of the goal and get the final result.
*
* If callbacks were set in @param options, you will receive callbacks, as long as you hold a reference
* to the shared pointer contained in the returned future, or rclcpp_action::Client is destroyed. Dropping
* the shared pointer to the goal handle will not cancel the goal. In order to cancel it, you must explicitly
* call async_cancel_goal.
*
* WARNING this method has inconsistent behaviour and a memory leak bug.
* If you set the result callback in @param options, the handle will be self referencing, and you will receive
* callbacks even though you do not hold a reference to the shared pointer. In this case, the self reference will
* be deleted if the result callback was received. If there is no result callback, there will be a memory leak.
*
* To prevent the memory leak, you may call stop_callbacks() explicit. This will delete the self reference.
*
* \param[in] goal The goal request.
* \param[in] options Options for sending the goal request. Contains references to callbacks for
Expand Down Expand Up @@ -448,7 +458,7 @@ class Client : public ClientBase
std::shared_ptr<GoalHandle> goal_handle(
new GoalHandle(goal_info, options.feedback_callback, options.result_callback));
{
std::lock_guard<std::mutex> guard(goal_handles_mutex_);
std::lock_guard<std::recursive_mutex> guard(goal_handles_mutex_);
goal_handles_[goal_handle->get_goal_id()] = goal_handle;
}
promise->set_value(goal_handle);
Expand All @@ -466,7 +476,7 @@ class Client : public ClientBase
// To prevent the list from growing out of control, forget about any goals
// with no more user references
{
std::lock_guard<std::mutex> guard(goal_handles_mutex_);
std::lock_guard<std::recursive_mutex> guard(goal_handles_mutex_);
auto goal_handle_it = goal_handles_.begin();
while (goal_handle_it != goal_handles_.end()) {
if (!goal_handle_it->second.lock()) {
Expand Down Expand Up @@ -496,7 +506,7 @@ class Client : public ClientBase
typename GoalHandle::SharedPtr goal_handle,
ResultCallback result_callback = nullptr)
{
std::lock_guard<std::mutex> lock(goal_handles_mutex_);
std::lock_guard<std::recursive_mutex> lock(goal_handles_mutex_);
if (goal_handles_.count(goal_handle->get_goal_id()) == 0) {
throw exceptions::UnknownGoalHandleError();
}
Expand Down Expand Up @@ -531,7 +541,7 @@ class Client : public ClientBase
typename GoalHandle::SharedPtr goal_handle,
CancelCallback cancel_callback = nullptr)
{
std::lock_guard<std::mutex> lock(goal_handles_mutex_);
std::lock_guard<std::recursive_mutex> lock(goal_handles_mutex_);
if (goal_handles_.count(goal_handle->get_goal_id()) == 0) {
throw exceptions::UnknownGoalHandleError();
}
Expand Down Expand Up @@ -562,6 +572,63 @@ class Client : public ClientBase
return async_cancel(cancel_request, cancel_callback);
}

/// Stops the callbacks for the goal in a thread safe way
/**
* This will NOT cancel the goal, it will only stop the callbacks.
*
* After the call to this function, it is guaranteed that there
* will be no more callbacks from the goal. This is not guaranteed
* if multiple threads are involved, and the goal_handle is just
* dropped.
*
* \param[in] goal_handle The goal were the callbacks shall be stopped
*/
void stop_callbacks(typename GoalHandle::SharedPtr goal_handle)
{
goal_handle->set_feedback_callback(typename GoalHandle::FeedbackCallback());
goal_handle->set_result_callback(typename GoalHandle::ResultCallback());

std::lock_guard<std::recursive_mutex> guard(goal_handles_mutex_);
const GoalUUID & goal_id = goal_handle->get_goal_id();
auto it = goal_handles_.find(goal_id);
if (goal_handles_.end() == it) {
// someone else already deleted the entry
// e.g. the result callback
RCLCPP_DEBUG(
this->get_logger(),
"Given goal is unknown. Ignoring...");
return;
}
goal_handles_.erase(it);
}

/// Stops the callbacks for the goal in a thread safe way
/**
* For futher information see stop_callbacks(typename GoalHandle::SharedPtr goal_handle)
*/
void stop_callbacks(const GoalUUID & goal_id)
{
typename GoalHandle::SharedPtr goal_handle;
{
std::lock_guard<std::recursive_mutex> guard(goal_handles_mutex_);
auto it = goal_handles_.find(goal_id);
if (goal_handles_.end() == it) {
// someone else already deleted the entry
// e.g. the result callback
RCLCPP_DEBUG(
this->get_logger(),
"Given goal is unknown. Ignoring...");
return;
}

goal_handle = it->lock();
}

if (goal_handle) {
stop_callbacks(goal_handle);
}
}

/// Asynchronously request all goals at or before a specified time be canceled.
/**
* \param[in] stamp The timestamp for the cancel goal request.
Expand Down Expand Up @@ -590,7 +657,7 @@ class Client : public ClientBase
virtual
~Client()
{
std::lock_guard<std::mutex> guard(goal_handles_mutex_);
std::lock_guard<std::recursive_mutex> guard(goal_handles_mutex_);
auto it = goal_handles_.begin();
while (it != goal_handles_.end()) {
typename GoalHandle::SharedPtr goal_handle = it->second.lock();
Expand Down Expand Up @@ -637,7 +704,7 @@ class Client : public ClientBase
void
handle_feedback_message(std::shared_ptr<void> message) override
{
std::lock_guard<std::mutex> guard(goal_handles_mutex_);
std::lock_guard<std::recursive_mutex> guard(goal_handles_mutex_);
using FeedbackMessage = typename ActionT::Impl::FeedbackMessage;
typename FeedbackMessage::SharedPtr feedback_message =
std::static_pointer_cast<FeedbackMessage>(message);
Expand Down Expand Up @@ -674,7 +741,7 @@ class Client : public ClientBase
void
handle_status_message(std::shared_ptr<void> message) override
{
std::lock_guard<std::mutex> guard(goal_handles_mutex_);
std::lock_guard<std::recursive_mutex> guard(goal_handles_mutex_);
using GoalStatusMessage = typename ActionT::Impl::GoalStatusMessage;
auto status_message = std::static_pointer_cast<GoalStatusMessage>(message);
for (const GoalStatus & status : status_message->status_list) {
Expand Down Expand Up @@ -723,7 +790,7 @@ class Client : public ClientBase
wrapped_result.goal_id = goal_handle->get_goal_id();
wrapped_result.code = static_cast<ResultCode>(result_response->status);
goal_handle->set_result(wrapped_result);
std::lock_guard<std::mutex> lock(goal_handles_mutex_);
std::lock_guard<std::recursive_mutex> lock(goal_handles_mutex_);
goal_handles_.erase(goal_handle->get_goal_id());
});
} catch (rclcpp::exceptions::RCLError & ex) {
Expand Down Expand Up @@ -755,7 +822,7 @@ class Client : public ClientBase
}

std::map<GoalUUID, typename GoalHandle::WeakPtr> goal_handles_;
std::mutex goal_handles_mutex_;
std::recursive_mutex goal_handles_mutex_;
};
} // namespace rclcpp_action

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ ClientGoalHandle<ActionT>::set_result(const WrappedResult & wrapped_result)
result_promise_.set_value(wrapped_result);
if (result_callback_) {
result_callback_(wrapped_result);
result_callback_ = ResultCallback();
}
}

Expand Down

0 comments on commit 839348c

Please sign in to comment.