diff --git a/CMakeLists.txt b/CMakeLists.txt index 21f2e96..ad4e961 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,6 +41,8 @@ ament_target_dependencies(thread_priority PUBLIC ${THIS_PACKAGE_INCLUDE_DEPENDS} # Unit Tests if(BUILD_TESTING) find_package(ament_cmake_gmock REQUIRED) + find_package(lifecycle_msgs REQUIRED) + find_package(rclcpp_lifecycle REQUIRED) find_package(test_msgs REQUIRED) ament_add_gmock(realtime_box_tests test/realtime_box_tests.cpp) @@ -72,6 +74,10 @@ if(BUILD_TESTING) test/realtime_server_goal_handle_tests.cpp) target_link_libraries(realtime_server_goal_handle_tests realtime_tools) ament_target_dependencies(realtime_server_goal_handle_tests test_msgs) + + ament_add_gmock(test_async_function_handler test/test_async_function_handler.cpp) + target_link_libraries(test_async_function_handler realtime_tools thread_priority) + ament_target_dependencies(test_async_function_handler lifecycle_msgs rclcpp_lifecycle) endif() # Install diff --git a/include/realtime_tools/async_function_handler.hpp b/include/realtime_tools/async_function_handler.hpp new file mode 100644 index 0000000..381e8e0 --- /dev/null +++ b/include/realtime_tools/async_function_handler.hpp @@ -0,0 +1,287 @@ +// Copyright 2024 PAL Robotics S.L. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// \author Sai Kishor Kothakota + +#ifndef REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_ +#define REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/duration.hpp" +#include "rclcpp/logging.hpp" +#include "rclcpp/time.hpp" +#include "realtime_tools/thread_priority.hpp" + +namespace realtime_tools +{ +/** + * @brief Class to handle asynchronous function calls. + * AsyncFunctionHandler is a class that allows the user to have a asynchronous call to the parsed + * method and be able to set some thread specific parameters + */ +template +class AsyncFunctionHandler +{ +public: + AsyncFunctionHandler() = default; + + ~AsyncFunctionHandler() { stop_thread(); } + + /// Initialize the AsyncFunctionHandler with the callback and thread_priority + /** + * @param callback Function that will be called asynchronously + * If the AsyncFunctionHandler is already initialized and is running, it will throw a runtime + * error. + * If the parsed functions are not valid, it will throw a runtime error. + */ + void init( + std::function callback, + int thread_priority = 50) + { + if (callback == nullptr) { + throw std::runtime_error( + "AsyncFunctionHandler: parsed function to call asynchronously is not valid!"); + } + if (thread_.joinable()) { + throw std::runtime_error( + "AsyncFunctionHandler: Cannot reinitialize while the thread is " + "running. Please stop the async callback first!"); + } + async_function_ = callback; + thread_priority_ = thread_priority; + } + + /// Initialize the AsyncFunctionHandler with the callback, trigger_predicate and thread_priority + /** + * @param callback Function that will be called asynchronously. + * @param trigger_predicate Predicate function that will be called to check if the async callback + * method should be triggered or not. + * @param thread_priority Priority of the async worker thread. + * + * \note The parsed trigger_predicate should be free from any concurrency issues. It is expected + * to be both thread-safe and reentrant. + * + * If the AsyncFunctionHandler is already initialized and is running, it will throw a runtime + * error. + * If the parsed functions are not valid, it will throw a runtime error. + */ + void init( + std::function callback, + std::function trigger_predicate, int thread_priority = 50) + { + if (trigger_predicate == nullptr) { + throw std::runtime_error("AsyncFunctionHandler: parsed trigger predicate is not valid!"); + } + init(callback, thread_priority); + trigger_predicate_ = trigger_predicate; + } + + /// Triggers the async callback method cycle + /** + * @param time Current time + * @param period Current period + * @return A pair with the first element being a boolean indicating if the async callback method was + * triggered and the second element being the last return value of the async function. + * If the AsyncFunctionHandler is not initialized properly, it will throw a runtime error. + * If the callback method is waiting for the trigger, it will notify the async thread to start + * the callback. + * If the async callback method is still running, it will return the last return value from the + * last trigger cycle. + * + * \note In the case of controllers, The controller manager is responsible + * for triggering and maintaining the controller's update rate, as it should be only acting as a + * scheduler. Same applies to the resource manager when handling the hardware components. + */ + std::pair trigger_async_callback( + const rclcpp::Time & time, const rclcpp::Duration & period) + { + if (!is_initialized()) { + throw std::runtime_error("AsyncFunctionHandler: need to be initialized first!"); + } + if (!is_running()) { + throw std::runtime_error( + "AsyncFunctionHandler: need to start the async callback thread first before triggering!"); + } + std::unique_lock lock(async_mtx_, std::try_to_lock); + bool trigger_status = false; + if (lock.owns_lock() && !trigger_in_progress_ && trigger_predicate_()) { + { + std::unique_lock scoped_lock(std::move(lock)); + trigger_in_progress_ = true; + current_callback_time_ = time; + current_callback_period_ = period; + } + async_callback_condition_.notify_one(); + trigger_status = true; + } + const T return_value = async_callback_return_; + return std::make_pair(trigger_status, return_value); + } + + /// Waits until the current async callback method trigger cycle is finished + /** + * If the async method is running, it will wait for the current async method call to finish. + */ + void wait_for_trigger_cycle_to_finish() + { + if (is_running()) { + std::unique_lock lock(async_mtx_); + cycle_end_condition_.wait(lock, [this] { return !trigger_in_progress_; }); + } + } + + /// Check if the AsyncFunctionHandler is initialized + /** + * @return True if the AsyncFunctionHandler is initialized, false otherwise + */ + bool is_initialized() const { return async_function_ && trigger_predicate_; } + + /// Join the async callback thread + /** + * If the async method is running, it will join the async thread. + * If the async method is not running, it will return immediately. + */ + void join_async_callback_thread() + { + if (is_running()) { + thread_.join(); + } + } + + /// Check if the async worker thread is running + /** + * @return True if the async worker thread is running, false otherwise + */ + bool is_running() const { return thread_.joinable(); } + + /// Check if the async callback is triggered to stop the cycle + /** + * @return True if the async callback is requested to be stopped, false otherwise + */ + bool is_stopped() const { return stop_async_callback_; } + + /// Get the async worker thread + /** + * @return The async callback thread + */ + std::thread & get_thread() { return thread_; } + + /// Check if the async callback method is in progress + /** + * @return True if the async callback method is in progress, false otherwise + */ + bool is_trigger_cycle_in_progress() const { return trigger_in_progress_; } + + /// Stops the callback thread + /** + * If the async method is running, it will notify the async thread to stop and then joins the + * async thread. + */ + void stop_thread() + { + if (is_running()) { + { + std::unique_lock lock(async_mtx_); + stop_async_callback_ = true; + } + async_callback_condition_.notify_one(); + thread_.join(); + } + } + + /// Get the last execution time of the async callback method + /** + * @return The last execution time of the async callback method in seconds + */ + double get_last_execution_time() const { return last_execution_time_; } + + /// Initializes and starts the callback thread + /** + * If the worker thread is not running, it will start the async callback thread. + * If the worker thread is already configured and running, does nothing and returns + * immediately. + */ + void start_thread() + { + if (!is_initialized()) { + throw std::runtime_error("AsyncFunctionHandler: need to be initialized first!"); + } + if (!thread_.joinable()) { + stop_async_callback_ = false; + trigger_in_progress_ = false; + last_execution_time_ = 0.0; + async_callback_return_ = T(); + thread_ = std::thread([this]() -> void { + if (!realtime_tools::configure_sched_fifo(thread_priority_)) { + RCLCPP_WARN( + rclcpp::get_logger("AsyncFunctionHandler"), + "Could not enable FIFO RT scheduling policy. Consider setting up your user to do FIFO " + "RT " + "scheduling. See " + "[https://control.ros.org/master/doc/ros2_control/controller_manager/doc/userdoc.html] " + "for details."); + } + + while (!stop_async_callback_.load(std::memory_order_relaxed)) { + { + std::unique_lock lock(async_mtx_); + async_callback_condition_.wait( + lock, [this] { return trigger_in_progress_ || stop_async_callback_; }); + if (!stop_async_callback_) { + const auto start_time = std::chrono::steady_clock::now(); + async_callback_return_ = + async_function_(current_callback_time_, current_callback_period_); + const auto end_time = std::chrono::steady_clock::now(); + last_execution_time_ = std::chrono::duration(end_time - start_time).count(); + } + trigger_in_progress_ = false; + } + cycle_end_condition_.notify_all(); + } + }); + } + } + +private: + rclcpp::Time current_callback_time_; + rclcpp::Duration current_callback_period_{0, 0}; + + std::function async_function_; + std::function trigger_predicate_ = []() { return true; }; + + // Async related variables + std::thread thread_; + int thread_priority_ = std::numeric_limits::quiet_NaN(); + std::atomic_bool stop_async_callback_{false}; + std::atomic_bool trigger_in_progress_{false}; + std::atomic async_callback_return_; + std::condition_variable async_callback_condition_; + std::condition_variable cycle_end_condition_; + std::mutex async_mtx_; + std::atomic last_execution_time_; +}; +} // namespace realtime_tools + +#endif // REALTIME_TOOLS__ASYNC_FUNCTION_HANDLER_HPP_ diff --git a/package.xml b/package.xml index 7658c85..8f575d2 100644 --- a/package.xml +++ b/package.xml @@ -23,6 +23,8 @@ rclcpp_action ament_cmake_gmock + lifecycle_msgs + rclcpp_lifecycle test_msgs diff --git a/test/test_async_function_handler.cpp b/test/test_async_function_handler.cpp new file mode 100644 index 0000000..eb0e11b --- /dev/null +++ b/test/test_async_function_handler.cpp @@ -0,0 +1,257 @@ +// Copyright 2024 PAL Robotics S.L. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test_async_function_handler.hpp" +#include "gmock/gmock.h" +#include "rclcpp/rclcpp.hpp" + +namespace realtime_tools +{ +TestAsyncFunctionHandler::TestAsyncFunctionHandler() +: state_(rclcpp_lifecycle::State(lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE, "test_async")), + counter_(0), + return_state_(return_type::OK) +{ +} + +void TestAsyncFunctionHandler::initialize() +{ + handler_.init( + std::bind( + &TestAsyncFunctionHandler::update, this, std::placeholders::_1, std::placeholders::_2), + [this]() { return state_.id() == lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE; }); +} + +std::pair TestAsyncFunctionHandler::trigger() +{ + return handler_.trigger_async_callback(rclcpp::Time(0, 0), rclcpp::Duration(0, 0)); +} + +return_type TestAsyncFunctionHandler::update( + const rclcpp::Time & /*time*/, const rclcpp::Duration & /*period*/) +{ + // to simulate some work being done + std::this_thread::sleep_for(std::chrono::microseconds(10)); + counter_++; + return return_state_; +} +const rclcpp_lifecycle::State & TestAsyncFunctionHandler::get_state() const { return state_; } + +int TestAsyncFunctionHandler::get_counter() const { return counter_; } +void TestAsyncFunctionHandler::activate() +{ + state_ = + rclcpp_lifecycle::State(lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE, state_.label()); +} + +void TestAsyncFunctionHandler::deactivate() +{ + state_ = + rclcpp_lifecycle::State(lifecycle_msgs::msg::State::PRIMARY_STATE_INACTIVE, state_.label()); +} +} // namespace realtime_tools +class AsyncFunctionHandlerTest : public ::testing::Test +{ +public: + static void SetUpTestCase() { rclcpp::init(0, nullptr); } + + static void TearDownTestCase() { rclcpp::shutdown(); } +}; + +TEST_F(AsyncFunctionHandlerTest, check_initialization) +{ + realtime_tools::TestAsyncFunctionHandler async_class; + + ASSERT_FALSE(async_class.get_handler().is_initialized()); + ASSERT_FALSE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + + // It should not be possible to initialize setting wrong functions + ASSERT_THROW(async_class.get_handler().init(nullptr, nullptr), std::runtime_error); + + async_class.initialize(); + ASSERT_TRUE(async_class.get_handler().is_initialized()); + ASSERT_FALSE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + + // Once initialized, it should not be possible to initialize again + async_class.get_handler().start_thread(); + auto trigger_status = async_class.trigger(); + ASSERT_TRUE(trigger_status.first); + ASSERT_EQ(realtime_tools::return_type::OK, trigger_status.second); + ASSERT_TRUE(async_class.get_handler().is_initialized()); + ASSERT_TRUE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + ASSERT_THROW(async_class.initialize(), std::runtime_error); +} + +TEST_F(AsyncFunctionHandlerTest, check_triggering) +{ + realtime_tools::TestAsyncFunctionHandler async_class; + + ASSERT_FALSE(async_class.get_handler().is_initialized()); + ASSERT_FALSE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + // It shouldn't be possible to trigger without initialization + ASSERT_THROW(async_class.trigger(), std::runtime_error); + + async_class.initialize(); + ASSERT_TRUE(async_class.get_handler().is_initialized()); + ASSERT_FALSE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + // It shouldn't be possible to trigger without starting the thread + ASSERT_THROW(async_class.trigger(), std::runtime_error); + async_class.get_handler().start_thread(); + + EXPECT_EQ(async_class.get_state().id(), lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE); + auto trigger_status = async_class.trigger(); + ASSERT_TRUE(trigger_status.first); + ASSERT_EQ(realtime_tools::return_type::OK, trigger_status.second); + ASSERT_TRUE(async_class.get_handler().is_initialized()); + ASSERT_TRUE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + ASSERT_TRUE(async_class.get_handler().get_thread().joinable()); + ASSERT_TRUE(async_class.get_handler().is_trigger_cycle_in_progress()); + async_class.get_handler().wait_for_trigger_cycle_to_finish(); + async_class.get_handler().get_last_execution_time(); + ASSERT_FALSE(async_class.get_handler().is_trigger_cycle_in_progress()); + ASSERT_EQ(async_class.get_counter(), 1); + + // Trigger one more cycle + trigger_status = async_class.trigger(); + ASSERT_TRUE(trigger_status.first); + ASSERT_EQ(realtime_tools::return_type::OK, trigger_status.second); + ASSERT_TRUE(async_class.get_handler().is_initialized()); + ASSERT_TRUE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + async_class.get_handler().stop_thread(); + ASSERT_LE(async_class.get_counter(), 2); + + // now the async update should be preempted + ASSERT_FALSE(async_class.get_handler().is_running()); + ASSERT_TRUE(async_class.get_handler().is_stopped()); + async_class.get_handler().wait_for_trigger_cycle_to_finish(); +} + +TEST_F(AsyncFunctionHandlerTest, trigger_for_several_cycles) +{ + realtime_tools::TestAsyncFunctionHandler async_class; + + async_class.initialize(); + ASSERT_TRUE(async_class.get_handler().is_initialized()); + ASSERT_FALSE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + async_class.get_handler().start_thread(); + + EXPECT_EQ(async_class.get_state().id(), lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE); + + int missed_triggers = 0; + const int total_cycles = 1e5; + for (int i = 1; i < total_cycles; i++) { + const auto trigger_status = async_class.trigger(); + if (trigger_status.first) { + ASSERT_EQ(realtime_tools::return_type::OK, trigger_status.second); + ASSERT_TRUE(async_class.get_handler().is_initialized()); + ASSERT_TRUE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + async_class.get_handler().wait_for_trigger_cycle_to_finish(); + ASSERT_FALSE(async_class.get_handler().is_trigger_cycle_in_progress()); + ASSERT_EQ(async_class.get_counter(), i - missed_triggers); + } else { + missed_triggers++; + } + } + // Make sure that the failed triggers are less than 0.1% + ASSERT_LT(missed_triggers, static_cast(0.001 * total_cycles)) + << "The missed triggers cannot be more than 0.1%!"; + async_class.get_handler().stop_thread(); + + // now the async update should be preempted + ASSERT_FALSE(async_class.get_handler().is_running()); + ASSERT_TRUE(async_class.get_handler().is_stopped()); +} + +TEST_F(AsyncFunctionHandlerTest, test_with_deactivate_and_activate_cycles) +{ + realtime_tools::TestAsyncFunctionHandler async_class; + + // Start with a deactivated state + async_class.initialize(); + async_class.deactivate(); + ASSERT_THROW(async_class.trigger(), std::runtime_error) + << "Should throw before starting a thread"; + ASSERT_TRUE(async_class.get_handler().is_initialized()); + ASSERT_FALSE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + async_class.get_handler().start_thread(); + ASSERT_TRUE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + EXPECT_EQ(async_class.get_state().id(), lifecycle_msgs::msg::State::PRIMARY_STATE_INACTIVE); + + // Now activate it and launch again + async_class.activate(); + EXPECT_EQ(async_class.get_state().id(), lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE); + const int total_cycles = 100; + for (int i = 1; i < total_cycles; i++) { + const auto trigger_status = async_class.trigger(); + ASSERT_TRUE(trigger_status.first); + ASSERT_EQ(realtime_tools::return_type::OK, trigger_status.second); + ASSERT_TRUE(async_class.get_handler().is_trigger_cycle_in_progress()); + ASSERT_TRUE(async_class.get_handler().is_initialized()); + ASSERT_TRUE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + async_class.get_handler().wait_for_trigger_cycle_to_finish(); + ASSERT_FALSE(async_class.get_handler().is_trigger_cycle_in_progress()); + ASSERT_EQ(async_class.get_counter(), i); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + // Now let's do one more trigger cycle and then change the state to deactivate, then it should end + // the thread once the cycle is finished + auto trigger_status = async_class.trigger(); + ASSERT_TRUE(trigger_status.first); + ASSERT_EQ(realtime_tools::return_type::OK, trigger_status.second); + async_class.deactivate(); + async_class.get_handler().wait_for_trigger_cycle_to_finish(); + for (int i = 0; i < 50; i++) { + const auto trigger_status = async_class.trigger(); + ASSERT_FALSE(trigger_status.first); + ASSERT_EQ(async_class.get_counter(), total_cycles) + << "The trigger should fail for any state different than ACTIVE"; + } + EXPECT_EQ(async_class.get_state().id(), lifecycle_msgs::msg::State::PRIMARY_STATE_INACTIVE); + ASSERT_TRUE(async_class.get_handler().is_initialized()); + ASSERT_TRUE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + + // Now let's test the case of activating it and then deactivating it when the thread is waiting + // for a trigger to start new update cycle execution + async_class.activate(); + + EXPECT_EQ(async_class.get_state().id(), lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE); + trigger_status = async_class.trigger(); + ASSERT_TRUE(trigger_status.first); + ASSERT_EQ(realtime_tools::return_type::OK, trigger_status.second); + async_class.get_handler().wait_for_trigger_cycle_to_finish(); + async_class.deactivate(); + EXPECT_EQ(async_class.get_state().id(), lifecycle_msgs::msg::State::PRIMARY_STATE_INACTIVE); + // Now they continue to wait until a new cycle is triggered or the preempt is called + ASSERT_TRUE(async_class.get_handler().is_running()); + ASSERT_FALSE(async_class.get_handler().is_stopped()); + + // now the async update should be preempted + async_class.get_handler().stop_thread(); + ASSERT_FALSE(async_class.get_handler().is_running()); + ASSERT_TRUE(async_class.get_handler().is_stopped()); +} diff --git a/test/test_async_function_handler.hpp b/test/test_async_function_handler.hpp new file mode 100644 index 0000000..c480543 --- /dev/null +++ b/test/test_async_function_handler.hpp @@ -0,0 +1,60 @@ +// Copyright 2024 PAL Robotics S.L. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef TEST_ASYNC_FUNCTION_HANDLER_HPP_ +#define TEST_ASYNC_FUNCTION_HANDLER_HPP_ + +#include + +#include "lifecycle_msgs/msg/state.hpp" +#include "rclcpp_lifecycle/state.hpp" +#include "realtime_tools/async_function_handler.hpp" + +namespace realtime_tools +{ +enum class return_type : std::uint8_t { + OK = 0, + ERROR = 1, + DEACTIVATE = 2, +}; + +class TestAsyncFunctionHandler +{ +public: + TestAsyncFunctionHandler(); + + void initialize(); + + realtime_tools::AsyncFunctionHandler & get_handler() { return handler_; } + + std::pair trigger(); + + return_type update(const rclcpp::Time & time, const rclcpp::Duration & period); + + const rclcpp_lifecycle::State & get_state() const; + + int get_counter() const; + + void activate(); + + void deactivate(); + +private: + rclcpp_lifecycle::State state_; + int counter_; + return_type return_state_; + realtime_tools::AsyncFunctionHandler handler_; +}; +} // namespace realtime_tools +#endif // TEST_ASYNC_FUNCTION_HANDLER_HPP_