Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make RosTopicSubNode a StatefulActionNode and handle different reliability QoS configurations #95

Draft
wants to merge 7 commits into
base: humble
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions behaviortree_ros2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ target_include_directories(${PROJECT_NAME} PUBLIC

target_link_libraries(${PROJECT_NAME} bt_executor_parameters)

if (BUILD_TESTING)
add_subdirectory(test)
endif()

######################################################
# INSTALL
Expand Down
207 changes: 130 additions & 77 deletions behaviortree_ros2/include/behaviortree_ros2/bt_topic_sub_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

#pragma once

#include <behaviortree_cpp/action_node.h>
#include <behaviortree_cpp/basic_types.h>
#include <memory>
#include <rclcpp/logging.hpp>
#include <rclcpp/qos.hpp>
#include <string>
#include <rclcpp/executors.hpp>
#include <rclcpp/allocator/allocator_common.hpp>
#include "behaviortree_cpp/condition_node.h"
#include "behaviortree_cpp/bt_factory.h"

#include "behaviortree_ros2/ros_node_params.hpp"
#include <boost/signals2.hpp>
Expand All @@ -42,7 +44,7 @@ namespace BT
* 2. Otherwise, use the value in RosNodeParams::default_port_value
*/
template <class TopicT>
class RosTopicSubNode : public BT::ConditionNode
class RosTopicSubNode : public BT::StatefulActionNode
{
public:
// Type definitions
Expand All @@ -51,7 +53,9 @@ class RosTopicSubNode : public BT::ConditionNode
protected:
struct SubscriberInstance
{
SubscriberInstance(std::shared_ptr<rclcpp::Node> node, const std::string& topic_name);
SubscriberInstance(std::shared_ptr<rclcpp::Node> node, const std::string& topic_name,
const std::size_t history_depth,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just pass a complete QoS profile?

const rclcpp::ReliabilityPolicy& reliability);

std::shared_ptr<Subscriber> subscriber;
rclcpp::CallbackGroup::SharedPtr callback_group;
Expand Down Expand Up @@ -131,9 +135,13 @@ class RosTopicSubNode : public BT::ConditionNode
return providedBasicPorts({});
}

NodeStatus tick() override final;
NodeStatus onStart() override final;

/** Callback invoked in the tick. You must return either SUCCESS of FAILURE
NodeStatus onRunning() override final;

void onHalted() override final;

/** Callback invoked in the tick. You must return SUCCESS, FAILURE, or RUNNING.
*
* @param last_msg the latest message received, since the last tick.
* Will be empty if no new message received.
Expand All @@ -157,14 +165,17 @@ class RosTopicSubNode : public BT::ConditionNode

private:
bool createSubscriber(const std::string& topic_name);

NodeStatus checkStatus(const NodeStatus& status) const;
};

//----------------------------------------------------------------
//---------------------- DEFINITIONS -----------------------------
//----------------------------------------------------------------
template <class T>
inline RosTopicSubNode<T>::SubscriberInstance::SubscriberInstance(
std::shared_ptr<rclcpp::Node> node, const std::string& topic_name)
std::shared_ptr<rclcpp::Node> node, const std::string& topic_name,
const std::size_t history_depth, const rclcpp::ReliabilityPolicy& reliability)
{
// create a callback group for this particular instance
callback_group =
Expand All @@ -180,58 +191,24 @@ inline RosTopicSubNode<T>::SubscriberInstance::SubscriberInstance(
last_msg = msg;
broadcaster(msg);
};
subscriber = node->create_subscription<T>(topic_name, 1, callback, option);
subscriber = node->create_subscription<T>(
topic_name, rclcpp::QoS(history_depth).reliability(reliability), callback, option);
}

template <class T>
inline RosTopicSubNode<T>::RosTopicSubNode(const std::string& instance_name,
const NodeConfig& conf,
const RosNodeParams& params)
: BT::ConditionNode(instance_name, conf), node_(params.nh)
: BT::StatefulActionNode(instance_name, conf), node_(params.nh)
{
// check port remapping
auto portIt = config().input_ports.find("topic_name");
if(portIt != config().input_ports.end())
// Check if default_port_value was used to provide a topic name.
if(!params.default_port_value.empty())
{
const std::string& bb_topic_name = portIt->second;

if(bb_topic_name.empty() || bb_topic_name == "__default__placeholder__")
{
if(params.default_port_value.empty())
{
throw std::logic_error("Both [topic_name] in the InputPort and the RosNodeParams "
"are empty.");
}
else
{
createSubscriber(params.default_port_value);
}
}
else if(!isBlackboardPointer(bb_topic_name))
{
// If the content of the port "topic_name" is not
// a pointer to the blackboard, but a static string, we can
// create the client in the constructor.
createSubscriber(bb_topic_name);
}
else
{
// do nothing
// createSubscriber will be invoked in the first tick().
}
}
else
{
if(params.default_port_value.empty())
{
throw std::logic_error("Both [topic_name] in the InputPort and the RosNodeParams "
"are empty.");
}
else
{
createSubscriber(params.default_port_value);
}
topic_name_ = params.default_port_value;
}

// If no value was provided through the params, assume that the topic name will be set
// through a port when the node is first ticked.
}

template <class T>
Expand All @@ -243,25 +220,53 @@ inline bool RosTopicSubNode<T>::createSubscriber(const std::string& topic_name)
}
if(sub_instance_)
{
throw RuntimeError("Can't call createSubscriber more than once");
throw RuntimeError("Subscriber already exists");
}

// find SubscriberInstance in the registry
std::unique_lock lk(registryMutex());

auto node = node_.lock();
if(!node)
{
throw RuntimeError("The ROS node went out of scope. RosNodeParams doesn't take the "
"ownership of the node.");
}

const auto publisher_info = node->get_publishers_info_by_topic(topic_name);
if(publisher_info.empty())
{
RCLCPP_INFO(logger(),
"No publisher found on topic [%s]. Deferring creation of subscriber "
"until publisher exists.",
topic_name_.c_str());
return false;
}

rclcpp::ReliabilityPolicy publisher_reliability =
publisher_info.at(0).qos_profile().reliability();
for(std::size_t i = 1; i < publisher_info.size(); i++)
{
if(publisher_reliability != publisher_info.at(i).qos_profile().reliability())
{
RCLCPP_WARN_ONCE(logger(),
"Multiple publishers were found on topic [%s] with different QoS "
"reliability policies. The subscriber will use the reliability "
"setting from the first publisher it found, but this may result "
"in undesirable behavior.",
topic_name_.c_str());
break;
}
}

subscriber_key_ = std::string(node->get_fully_qualified_name()) + "/" + topic_name;

// find SubscriberInstance in the registry
std::unique_lock lk(registryMutex());

auto& registry = getRegistry();
auto it = registry.find(subscriber_key_);
if(it == registry.end() || it->second.expired())
{
sub_instance_ = std::make_shared<SubscriberInstance>(node, topic_name);
sub_instance_ =
std::make_shared<SubscriberInstance>(node, topic_name, 1, publisher_reliability);
registry.insert({ subscriber_key_, sub_instance_ });

RCLCPP_INFO(logger(), "Node [%s] created Subscriber to topic [%s]", name().c_str(),
Expand All @@ -282,45 +287,93 @@ inline bool RosTopicSubNode<T>::createSubscriber(const std::string& topic_name)
signal_connection_ = sub_instance_->broadcaster.connect(
[this](const std::shared_ptr<T> msg) { last_msg_ = msg; });

topic_name_ = topic_name;
return true;
}
template <class T>
inline NodeStatus RosTopicSubNode<T>::checkStatus(const NodeStatus& status) const
{
if(!isStatusActive(status))
{
throw std::logic_error("RosTopicSubNode: the callback must return SUCCESS, FAILURE, "
"or RUNNING");
}
return status;
};

template <class T>
inline NodeStatus RosTopicSubNode<T>::tick()
inline NodeStatus RosTopicSubNode<T>::onStart()
{
// First, check if the subscriber_ is valid and that the name of the
// topic_name in the port didn't change.
// otherwise, create a new subscriber
std::string topic_name;
getInput("topic_name", topic_name);

if(!topic_name.empty() && topic_name != "__default__placeholder__" &&
topic_name != topic_name_)
// If the topic name was provided through an input port, is a valid topic name, and is different
// from the stored topic name, update the stored topic name to the new string.
if(const auto topic_name = getInput<std::string>("topic_name");
topic_name.has_value() && !topic_name->empty() &&
topic_name.value() != "__default__placeholder__" && topic_name != topic_name_)
{
sub_instance_.reset();
topic_name_ = topic_name.value();
}

if(!sub_instance_)
// If we fail to create a subscriber, exit early and try again on the next tick
if(!createSubscriber(topic_name_))
{
createSubscriber(topic_name);
return NodeStatus::RUNNING;
}

auto CheckStatus = [](NodeStatus status) {
if(!isStatusCompleted(status))
{
throw std::logic_error("RosTopicSubNode: the callback must return"
"either SUCCESS or FAILURE");
}
return status;
};
// Discard any previous message unless latching is enabled.
// This enforces that the message must have been received after the current call to onStart(), even if this node has been ticked to completion previously.
if(!latchLastMessage())
{
last_msg_.reset();
}

// NOTE(schornakj): Something weird is happening here that prevents the subscriber from receiving a message if the publisher both initializes and publishes a message in between ticks.
// I think it has to do with the discovery process between the publisher and subscriber and how that relates to events being handled by the executor.
// This might depend on the middleware implemenation too.

sub_instance_->callback_group_executor.spin_some();
auto status = CheckStatus(onTick(last_msg_));

// If no message was received, return RUNNING
if(last_msg_ == nullptr)
{
return NodeStatus::RUNNING;
}

auto status = checkStatus(onTick(last_msg_));
Comment on lines +334 to +340
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be simpler if the user's implementation of onTick received the message by const reference instead of as a pointer. It would make it pretty unambiguous that if onTick is called then you've received a message, which is a bit friendlier than the current code where you need to check if the pointer actually points to anything before potentially handling the message. I didn't add this here yet because it's a breaking change that would impact all existing code derived from RosTopicSubNode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would prevent the use case where you just want to fail if there is no message, as const ref requires the message to be there.

if(!latchLastMessage())
{
last_msg_.reset();
}
return status;
}

template <class T>
inline NodeStatus RosTopicSubNode<T>::onRunning()
{
// If the subscriber wasn't initialized during onStart(), attempt to create it here.
if(!sub_instance_)
{
// If the subscriber still cannot be created, return RUNNING and try again on the next tick.
if(!createSubscriber(topic_name_))
{
return NodeStatus::RUNNING;
}
}

sub_instance_->callback_group_executor.spin_some();

// If no message was received, return RUNNING
if(last_msg_ == nullptr)
{
return NodeStatus::RUNNING;
}

// TODO(schornakj): handle timeout here
Copy link
Author

@schornakj schornakj Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it useful to allow timing out while waiting for a message to be received? We could get it from RosNodeParams::server_timeout like we do in the action and service client nodes. Alternatively, it's possible to use behavior tree logic to sleep for some duration in parallel and trigger a failure if the sleep finishes before a message is received, so even without a timeout there's a way to break out of deadlock if needed.


auto status = checkStatus(onTick(last_msg_));

return status;
}

template <class T>
inline void RosTopicSubNode<T>::onHalted()
{}
} // namespace BT
7 changes: 7 additions & 0 deletions behaviortree_ros2/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
find_package(ament_cmake_gmock REQUIRED)
find_package(std_msgs REQUIRED)
find_package(launch_testing_ament_cmake REQUIRED)

ament_add_gmock(test_${PROJECT_NAME} test_bt_topic_sub_node.cpp)
target_link_libraries(test_${PROJECT_NAME} ${PROJECT_NAME})
ament_target_dependencies(test_${PROJECT_NAME} std_msgs)
Loading