Skip to content

Commit

Permalink
Arbitrator should retry the failed service calls (#2231)
Browse files Browse the repository at this point in the history
* more logs

* add sleep for safeguard

* address PR

* address PR comments

* correct way

* address PR comments

* add continue statement

* add explicit cases
  • Loading branch information
MishkaMN authored Dec 26, 2023
1 parent b405ab4 commit 2c4f98a
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 33 deletions.
3 changes: 3 additions & 0 deletions arbitrator/include/capabilities_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <string>
#include <carma_planning_msgs/srv/plugin_list.hpp>
#include <carma_planning_msgs/srv/get_plugin_api.hpp>
#include <carma_planning_msgs/srv/plan_maneuvers.hpp>



namespace arbitrator
Expand Down Expand Up @@ -80,6 +82,7 @@ namespace arbitrator
protected:
private:
std::shared_ptr<carma_ros2_utils::CarmaLifecycleNode> nh_;
std::unordered_map<std::string,carma_ros2_utils::ClientPtr<carma_planning_msgs::srv::PlanManeuvers>> registered_strategic_plugins_;

carma_ros2_utils::ClientPtr<carma_planning_msgs::srv::GetPluginApi> sc_s_;
std::unordered_set <std::string> capabilities_ ;
Expand Down
102 changes: 84 additions & 18 deletions arbitrator/include/capabilities_interface.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,104 @@
#include <functional>
#include <carma_planning_msgs/srv/plan_maneuvers.hpp>
#include <rclcpp/exceptions/exceptions.hpp>

#include <chrono>
#include <thread>
namespace arbitrator
{
constexpr auto MAX_RETRY_ATTEMPTS {10};

template<typename MSrvReq, typename MSrvRes>
std::map<std::string, std::shared_ptr<MSrvRes>>
CapabilitiesInterface::multiplex_service_call_for_capability(const std::string& query_string, std::shared_ptr<MSrvReq> msg)
{
std::map<std::string, std::shared_ptr<MSrvRes>> responses;

for (const auto & topic : get_topics_for_capability(query_string))
bool topics_call_successful = false;
int retry_attempt = 0;
std::vector<std::string> detected_topics;
while (!topics_call_successful && retry_attempt < MAX_RETRY_ATTEMPTS)
{
try {
using std::literals::chrono_literals::operator""ms;
detected_topics = get_topics_for_capability(query_string);
topics_call_successful = true;
if (retry_attempt > 0)
{
RCLCPP_INFO_STREAM(rclcpp::get_logger("arbitrator")," Service call to get available strategic plugins successfully finished after retrying: " << retry_attempt);
}
} catch (const rclcpp::exceptions::RCLError& error) {
RCLCPP_WARN_STREAM(rclcpp::get_logger("arbitrator"),
"Could not make a service call to get available strategic service topic names, with detected error: " << error.what() <<". So retrying, attempt no: " << retry_attempt);
retry_attempt ++;
}

}

if (!topics_call_successful)
{
RCLCPP_ERROR_STREAM(rclcpp::get_logger("arbitrator"),
"Failed to retrieve available strategic plugin service topics! Returning empty maneuver responses");
return responses;
}

std::vector<std::string> topics_to_retry = detected_topics;
std::vector<std::string> current_topics_to_check;
retry_attempt = 0;
while (!topics_to_retry.empty() && retry_attempt < MAX_RETRY_ATTEMPTS)
{
current_topics_to_check = topics_to_retry;
topics_to_retry.clear();
for (const auto & topic : current_topics_to_check)
{
try {
using std::literals::chrono_literals::operator""ms;

if (registered_strategic_plugins_.count(topic) == 0)
registered_strategic_plugins_[topic] = nh_->create_client<carma_planning_msgs::srv::PlanManeuvers>(topic);

auto client = registered_strategic_plugins_[topic];

const auto client = nh_->create_client<carma_planning_msgs::srv::PlanManeuvers>(topic);
RCLCPP_DEBUG_STREAM(rclcpp::get_logger("arbitrator"), "found client: " << topic);
if (client->wait_for_service(500ms))
RCLCPP_DEBUG_STREAM(rclcpp::get_logger("arbitrator"), "found client: " << topic);
else
{
topics_to_retry.push_back(topic);
RCLCPP_WARN_STREAM(rclcpp::get_logger("arbitrator"), "Following client timed out: " << topic << ", retrying, attempt no: " << retry_attempt);
continue;
}

const auto response = client->async_send_request(msg);
const auto response = client->async_send_request(msg);

switch (const auto status{response.wait_for(500ms)}; status) {
case std::future_status::ready:
responses.emplace(topic, response.get());
break;
case std::future_status::deferred:
case std::future_status::timeout:
RCLCPP_WARN_STREAM(rclcpp::get_logger("arbitrator"), "failed...: " << topic);
switch (const auto status{response.wait_for(500ms)}) {
case std::future_status::ready:
responses.emplace(topic, response.get());
break;
case std::future_status::deferred:
RCLCPP_WARN_STREAM(rclcpp::get_logger("arbitrator"), "service call to " << topic << " is deferred... Please check if the plugin is active");
break;
case std::future_status::timeout:
RCLCPP_WARN_STREAM(rclcpp::get_logger("arbitrator"), "service call to " << topic << " is timed out... Please check if the plugin is active");
break;
default:
RCLCPP_WARN_STREAM(rclcpp::get_logger("arbitrator"), "service call to " << topic << " is failed... Please check if the plugin is active");
break;
}
if (retry_attempt > 0)
{
RCLCPP_INFO_STREAM(rclcpp::get_logger("arbitrator"),"Service call to: " << topic << ", successfully finished after retrying: " << retry_attempt);
}

} catch(const rclcpp::exceptions::RCLError& error) {
RCLCPP_WARN_STREAM(rclcpp::get_logger("arbitrator"),
"Cannot make service request for service '" << topic << "': " << error.what() <<". So retrying, attempt no: " << retry_attempt);
topics_to_retry.push_back(topic);
}
} catch(const rclcpp::exceptions::RCLError& error) {
RCLCPP_ERROR_STREAM(rclcpp::get_logger("arbitrator"),
"Cannot make service request for service '" << topic << "': " << error.what());
continue;
}
retry_attempt ++;
}

if (retry_attempt >= MAX_RETRY_ATTEMPTS)
{
RCLCPP_WARN_STREAM(rclcpp::get_logger("arbitrator"),
"Failed to get a valid response from one or all of the strategic plugins...");
}

return responses;
Expand Down
36 changes: 21 additions & 15 deletions arbitrator/src/tree_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace arbitrator
{
carma_planning_msgs::msg::ManeuverPlan TreePlanner::generate_plan(const VehicleState& start_state)
carma_planning_msgs::msg::ManeuverPlan TreePlanner::generate_plan(const VehicleState& start_state)
{
carma_planning_msgs::msg::ManeuverPlan root;
std::vector<std::pair<carma_planning_msgs::msg::ManeuverPlan, double>> open_list_to_evaluate;
Expand All @@ -33,7 +33,7 @@ namespace arbitrator

carma_planning_msgs::msg::ManeuverPlan longest_plan = root; // Track longest plan in case target length is never reached
rclcpp::Duration longest_plan_duration = rclcpp::Duration(0);


while (!open_list_to_evaluate.empty())
{
Expand All @@ -45,39 +45,39 @@ namespace arbitrator
carma_planning_msgs::msg::ManeuverPlan cur_plan = it->first;

RCLCPP_DEBUG_STREAM(rclcpp::get_logger("arbitrator"), "START");

for (auto mvr : cur_plan.maneuvers)
{
RCLCPP_DEBUG_STREAM(rclcpp::get_logger("arbitrator"), "Printing cur_plan: mvr: "<< (int)mvr.type);
}
}

RCLCPP_DEBUG_STREAM(rclcpp::get_logger("arbitrator"), "PRINT END");

auto plan_duration = rclcpp::Duration(0, 0); // zero duration

// If we're not at the root, plan_duration is nonzero (our plan should have maneuvers)
if (!cur_plan.maneuvers.empty())
if (!cur_plan.maneuvers.empty())
{
// get plan duration
plan_duration = arbitrator_utils::get_plan_end_time(cur_plan) - arbitrator_utils::get_plan_start_time(cur_plan);
plan_duration = arbitrator_utils::get_plan_end_time(cur_plan) - arbitrator_utils::get_plan_start_time(cur_plan);
}

// save longest if none of the plans have enough target duration
if (plan_duration > longest_plan_duration)
if (plan_duration > longest_plan_duration)
{
longest_plan_duration = plan_duration;
longest_plan = cur_plan;
}

// Evaluate plan_duration is sufficient do not expand more
if (plan_duration >= target_plan_duration_)
if (plan_duration >= target_plan_duration_)
{
final_open_list.push_back((*it));
RCLCPP_DEBUG_STREAM(rclcpp::get_logger("arbitrator"), "Has enough duration, skipping that which has following mvrs..:");
for (auto mvr : it->first.maneuvers)
{
RCLCPP_DEBUG_STREAM(rclcpp::get_logger("arbitrator"), "Printing mvr: "<< (int)mvr.type);
}
}
continue;
}

Expand All @@ -87,20 +87,26 @@ namespace arbitrator
// Compute cost for each child and store in open list
for (auto child = children.begin(); child != children.end(); child++)
{

if (child->maneuvers.empty())
{
RCLCPP_WARN_STREAM(rclcpp::get_logger("arbitrator"), "Child was empty for id: " << std::string(child->maneuver_plan_id));
continue;
continue;
}

temp_open_list.push_back(std::make_pair(*child, cost_function_->compute_cost_per_unit_distance(*child)));
}
}

open_list_to_evaluate = temp_open_list;
}

if (final_open_list.empty())
{
RCLCPP_ERROR_STREAM(rclcpp::get_logger("arbitrator"), "None of the strategic plugins generated any valid plans! Please check if any is turned and returning valid maneuvers...");
throw std::runtime_error("None of the strategic plugins generated any valid plans! Please check if any is turned and returning valid maneuvers...");
}

final_open_list = search_strategy_->prioritize_plans(final_open_list);

// now every plan has enough duration if possible and prioritized
Expand All @@ -111,10 +117,10 @@ namespace arbitrator
rclcpp::Duration plan_duration(0,0); // zero duration

// get plan duration
plan_duration = arbitrator_utils::get_plan_end_time(cur_plan) - arbitrator_utils::get_plan_start_time(cur_plan);
plan_duration = arbitrator_utils::get_plan_end_time(cur_plan) - arbitrator_utils::get_plan_start_time(cur_plan);

// Evaluate plan_duration is sufficient do not expand more
if (plan_duration >= target_plan_duration_)
if (plan_duration >= target_plan_duration_)
{
return cur_plan;
}
Expand Down

0 comments on commit 2c4f98a

Please sign in to comment.