Skip to content

Commit

Permalink
Apply thread settings (#3874)
Browse files Browse the repository at this point in the history
* Refs #19436. Added thread creation wrapper infrastructure.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Added empty implementation for apply_thread_settings_to_current_thread.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Refactor on Log.cpp

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Add implementation for setting scheduler and priority.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Add implementation for setting cpu affinity.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Add test setting config for Log thread.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Fix SystemInfoTests link issue.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Changes on ResourceEvent.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Changes on DataSharingListener.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Changes on FlowControllerImpl.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Changes on security LogTopic.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Apply settings on SharedMemWatchdog.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Apply settings on SharedMem reception threads.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Apply settings on SharedMem packet dump threads.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Apply settings on UDP reception threads.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Apply settings on TCP accept and keep_alive threads.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Apply settings on TCP reception threads.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Include what you use.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Add MacOS implementation for setting scheduler and priority.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Add MacOS implementation for setting thread affinity.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19437. Member cpu_mask changed to affinity and made it 64 bits.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19437. Windows implementation for thread affinity.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19437. Windows implementation for thread priority.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Made `get_thread_config_for_port` a const method.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19436. Apply suggestions from code review.

Signed-off-by: Miguel Company <[email protected]>

Co-authored-by: Eduardo Ponz Segrelles <[email protected]>

* Refs #19435. Some refactors on FileWatch:
- Namespace moved to eprosima::filewatch
- Constructor receives thread settings
- Copy constructors deleted

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. SystemInfo::watch_file receives thread settings.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. Added RTPSDomain::set_filewatch_thread_config

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. Call RTPSDomain::set_filewatch_thread_config inside DomainParticipantFactory::create_participant

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. Change priority default value.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. Account for default values in threading_pthread

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. Account for default values in threading_osx

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. Account for default values in threading_win32

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. Linters.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. Use C++ headers.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. Documentation updates.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. Suggestions on Log test.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19435. Removed unused overload of create_thread.

Signed-off-by: Miguel Company <[email protected]>

---------

Signed-off-by: Miguel Company <[email protected]>
Co-authored-by: Eduardo Ponz Segrelles <[email protected]>
  • Loading branch information
MiguelCompany and EduPonz committed Nov 13, 2023
1 parent 285fd36 commit 77f2c8a
Show file tree
Hide file tree
Showing 44 changed files with 685 additions and 235 deletions.
13 changes: 13 additions & 0 deletions include/fastdds/rtps/RTPSDomain.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <set>

#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastdds/rtps/history/IChangePool.h>
Expand Down Expand Up @@ -54,6 +55,18 @@ class RTPSDomain
{
public:

/**
* Method to set the configuration of the threads created by the file watcher for the environment file.
* In order for these settings to take effect, this method must be called before the first call
* to @ref createParticipant.
*
* @param watch_thread Settings for the thread watching the environment file.
* @param callback_thread Settings for the thread executing the callback when the environment file changed.
*/
RTPS_DllAPI static void set_filewatch_thread_config(
const fastdds::rtps::ThreadSettings& watch_thread,
const fastdds::rtps::ThreadSettings& callback_thread);

/**
* Method to shut down all RTPSParticipants, readers, writers, etc.
* It must be called at the end of the process to avoid memory leaks.
Expand Down
14 changes: 9 additions & 5 deletions include/fastdds/rtps/attributes/ThreadSettings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

#include <cstdint>
#include <limits>

#include <fastrtps/fastrtps_dll.h>

Expand All @@ -41,6 +42,7 @@ struct RTPS_DllAPI ThreadSettings
* A value of -1 indicates system default.
*
* This value is platform specific and it is used as-is to configure the specific platform thread.
* It is ignored on Windows platforms.
* Setting this value to something other than the default one may require different privileges
* on different platforms.
*/
Expand All @@ -50,25 +52,27 @@ struct RTPS_DllAPI ThreadSettings
* @brief The thread's priority.
*
* Configures the thread's priority.
* A value of 0 indicates system default.
* A value of -2^31 indicates system default.
*
* This value is platform specific and it is used as-is to configure the specific platform thread.
* Setting this value to something other than the default one may require different privileges
* on different platforms.
*/
int32_t priority = 0;
int32_t priority = std::numeric_limits<int32_t>::min();

/**
* @brief The thread's core affinity.
* @brief The thread's affinity.
*
* cpu_mask is a bit mask for setting the threads affinity to each core individually.
* On some systems (Windows, Linux), this is a bit mask for setting the threads affinity to each core individually.
* On MacOS, this sets the affinity tag for the thread, and the OS tries to share the L2 cache between threads
* with the same affinity.
* A value of 0 indicates no particular affinity.
*
* This value is platform specific and it is used as-is to configure the specific platform thread.
* Setting this value to something other than the default one may require different privileges
* on different platforms.
*/
uint32_t cpu_mask = 0;
uint64_t affinity = 0;

/**
* @brief The thread's stack size in bytes.
Expand Down
18 changes: 12 additions & 6 deletions include/fastdds/rtps/resources/ResourceEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastrtps/utils/TimedMutex.hpp>
#include <fastrtps/utils/TimedConditionVariable.hpp>

#include <atomic>
#include <functional>
#include <thread>
#include <vector>

#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastrtps/utils/TimedMutex.hpp>
#include <fastrtps/utils/TimedConditionVariable.hpp>

namespace eprosima {
namespace fastrtps {
namespace rtps {
Expand All @@ -51,11 +52,16 @@ class ResourceEvent
/*!
* @brief Method to initialize the internal thread.
*
* @param[in] configure_cb Function to be called in the context of the started thread
* before calling the internal service routine.
* @param[in] thread_cfg Settings to apply to the created thread.
* @param[in] name_fmt A null-terminated string to be used as the format argument of
* a `snprintf` like function, taking `thread_id` as additional
* argument, and used to give a name to the created thread.
* @param[in] thread_id Single variadic argument passed to the formatting function.
*/
void init_thread(
std::function<void()> configure_cb = {});
const fastdds::rtps::ThreadSettings& thread_cfg = {},
const char* name_fmt = "event %u",
uint32_t thread_id = 0);

void stop_thread();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class PortBasedTransportDescriptor : public TransportDescriptorInterface
* @return The ThreadSettings for the given port.
*/
virtual RTPS_DllAPI const ThreadSettings& get_thread_config_for_port(
uint32_t port);
uint32_t port) const;

virtual RTPS_DllAPI bool set_thread_config_for_port(
const uint32_t& port,
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/fastdds/domain/DomainParticipantFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <rtps/history/TopicPayloadPoolRegistry.hpp>
#include <statistics/fastdds/domain/DomainParticipantImpl.hpp>
#include <utils/SystemInfo.hpp>
#include <utils/shared_memory/SharedMemWatchdog.hpp>

using namespace eprosima::fastrtps::xmlparser;

Expand Down Expand Up @@ -156,6 +157,8 @@ DomainParticipant* DomainParticipantFactory::create_participant(
{
load_profiles();

RTPSDomain::set_filewatch_thread_config(factory_qos_.file_watch_threads(), factory_qos_.file_watch_threads());

const DomainParticipantQos& pqos = (&qos == &PARTICIPANT_QOS_DEFAULT) ? default_participant_qos_ : qos;

DomainParticipant* dom_part = new DomainParticipant(mask);
Expand Down Expand Up @@ -422,6 +425,8 @@ void DomainParticipantFactory::set_qos(
(void) first_time;
//As all the Qos can always be updated and none of them need to be sent
to = from;

rtps::SharedMemWatchdog::set_thread_settings(to.shm_watchdog_thread());
}

ReturnCode_t DomainParticipantFactory::check_qos(
Expand Down
37 changes: 18 additions & 19 deletions src/cpp/fastdds/log/Log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,17 @@ struct LogResources
void SetThreadConfig(
const rtps::ThreadSettings& config)
{
static_cast<void>(config);
return;
std::lock_guard<std::mutex> guard(cv_mutex_);
thread_settings_ = config;
}

//! Returns the logging_ engine to configuration defaults.
void Reset()
{
std::unique_lock<std::mutex> configGuard(config_mutex_);
rtps::ThreadSettings thr_config{};
SetThreadConfig(thr_config);

std::lock_guard<std::mutex> configGuard(config_mutex_);
category_filter_.reset();
filename_filter_.reset();
error_string_filter_.reset();
Expand All @@ -162,7 +165,7 @@ struct LogResources
{
std::unique_lock<std::mutex> guard(cv_mutex_);

if (!logging_ && !logging_thread_)
if (!logging_ && !logging_thread_.joinable())
{
// already killed
return;
Expand Down Expand Up @@ -237,19 +240,13 @@ struct LogResources
work_ = false;
}

if (logging_thread_)
if (logging_thread_.joinable())
{
cv_.notify_all();
// The #ifdef workaround here is due to an unsolved MSVC bug, which Microsoft has announced
// they have no intention of solving: https://connect.microsoft.com/VisualStudio/feedback/details/747145
// Each VS version deals with post-main deallocation of threads in a very different way.
#if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800
if (logging_thread_->joinable() && logging_thread_->get_id() != std::this_thread::get_id())
if (logging_thread_.get_id() != std::this_thread::get_id())
{
logging_thread_->join();
logging_thread_.join();
}
#endif // if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800
logging_thread_.reset();
}
}

Expand All @@ -258,17 +255,19 @@ struct LogResources
void StartThread()
{
std::unique_lock<std::mutex> guard(cv_mutex_);
if (!logging_ && !logging_thread_)
if (!logging_ && !logging_thread_.joinable())
{
logging_ = true;
logging_thread_.reset(new std::thread(&LogResources::run, this));
auto thread_fn = [this]()
{
run();
};
logging_thread_ = eprosima::create_thread(thread_fn, thread_settings_, "dds.log");
}
}

void run()
{
set_name_to_current_thread("dds.log");

std::unique_lock<std::mutex> guard(cv_mutex_);

while (logging_)
Expand Down Expand Up @@ -343,7 +342,7 @@ struct LogResources

fastrtps::DBQueue<Log::Entry> logs_;
std::vector<std::unique_ptr<LogConsumer>> consumers_;
std::unique_ptr<std::thread> logging_thread_;
std::thread logging_thread_;

// Condition variable segment.
std::condition_variable cv_;
Expand All @@ -361,7 +360,7 @@ struct LogResources
std::unique_ptr<std::regex> error_string_filter_;

std::atomic<Log::Kind> verbosity_;

rtps::ThreadSettings thread_settings_;
};

std::shared_ptr<LogResources> get_log_resources()
Expand Down
18 changes: 8 additions & 10 deletions src/cpp/rtps/DataSharing/DataSharingListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace rtps {
DataSharingListener::DataSharingListener(
std::shared_ptr<DataSharingNotification> notification,
const std::string& datasharing_pools_directory,
const fastdds::rtps::ThreadSettings& thr_config,
ResourceLimitedContainerConfig limits,
RTPSReader* reader)
: notification_(notification)
Expand All @@ -39,6 +40,7 @@ DataSharingListener::DataSharingListener(
, writer_pools_(limits)
, writer_pools_changed_(false)
, datasharing_pools_directory_(datasharing_pools_directory)
, thread_config_(thr_config)
{
}

Expand All @@ -50,8 +52,6 @@ DataSharingListener::~DataSharingListener()

void DataSharingListener::run()
{
set_name_to_current_thread("dds.dsha.%u", reader_->getGuid().entityId.to_uint32() & 0x0000FFFF);

std::unique_lock<Segment::mutex> lock(notification_->notification_->notification_mutex, std::defer_lock);
while (is_running_.load())
{
Expand Down Expand Up @@ -100,13 +100,15 @@ void DataSharingListener::start()
}

// Initialize the thread
listening_thread_ = new std::thread(&DataSharingListener::run, this);
uint32_t thread_id = reader_->getGuid().entityId.to_uint32() & 0x0000FFFF;
listening_thread_ = create_thread([this]()
{
run();
}, thread_config_, "dds.dsha.%u", thread_id);
}

void DataSharingListener::stop()
{
std::thread* thr = nullptr;

{
std::lock_guard<std::mutex> guard(mutex_);

Expand All @@ -116,15 +118,11 @@ void DataSharingListener::stop()
{
return;
}

thr = listening_thread_;
listening_thread_ = nullptr;
}

// Notify the thread and wait for it to finish
notification_->notify();
thr->join();
delete thr;
listening_thread_.join();
}

void DataSharingListener::process_new_data ()
Expand Down
16 changes: 10 additions & 6 deletions src/cpp/rtps/DataSharing/DataSharingListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
#ifndef RTPS_DATASHARING_DATASHARINGLISTENER_HPP
#define RTPS_DATASHARING_DATASHARINGLISTENER_HPP

#include <atomic>
#include <map>
#include <memory>

#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

#include <rtps/DataSharing/IDataSharingListener.hpp>
#include <rtps/DataSharing/DataSharingNotification.hpp>
#include <rtps/DataSharing/ReaderPool.hpp>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

#include <memory>
#include <atomic>
#include <map>

namespace eprosima {
namespace fastrtps {
Expand All @@ -46,6 +48,7 @@ class DataSharingListener : public IDataSharingListener
DataSharingListener(
std::shared_ptr<DataSharingNotification> notification,
const std::string& datasharing_pools_directory,
const fastdds::rtps::ThreadSettings& thr_config,
ResourceLimitedContainerConfig limits,
RTPSReader* reader);

Expand Down Expand Up @@ -111,10 +114,11 @@ class DataSharingListener : public IDataSharingListener
std::shared_ptr<DataSharingNotification> notification_;
std::atomic<bool> is_running_;
RTPSReader* reader_;
std::thread* listening_thread_;
std::thread listening_thread_;
ResourceLimitedVector<WriterInfo> writer_pools_;
std::atomic<bool> writer_pools_changed_;
std::string datasharing_pools_directory_;
fastdds::rtps::ThreadSettings thread_config_;
mutable std::mutex mutex_;

};
Expand Down
21 changes: 20 additions & 1 deletion src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ std::shared_ptr<RTPSDomainImpl> RTPSDomainImpl::get_instance()
return instance;
}

void RTPSDomain::set_filewatch_thread_config(
const fastdds::rtps::ThreadSettings& watch_thread,
const fastdds::rtps::ThreadSettings& callback_thread)
{
RTPSDomainImpl::set_filewatch_thread_config(watch_thread, callback_thread);
}

void RTPSDomain::stopAll()
{
RTPSDomainImpl::stopAll();
Expand Down Expand Up @@ -143,8 +150,10 @@ RTPSParticipant* RTPSDomainImpl::createParticipant(
std::string filename = SystemInfo::get_environment_file();
if (!filename.empty() && SystemInfo::file_exists(filename))
{
std::lock_guard<std::mutex> guard(instance->m_mutex);
// Create filewatch
instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback);
instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback,
instance->watch_thread_config_, instance->callback_thread_config_);
}
else if (!filename.empty())
{
Expand Down Expand Up @@ -778,6 +787,16 @@ void RTPSDomainImpl::file_watch_callback()
}
}

void RTPSDomainImpl::set_filewatch_thread_config(
const fastdds::rtps::ThreadSettings& watch_thread,
const fastdds::rtps::ThreadSettings& callback_thread)
{
auto instance = get_instance();
std::lock_guard<std::mutex> guard(instance->m_mutex);
instance->watch_thread_config_ = watch_thread;
instance->callback_thread_config_ = callback_thread;
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
Loading

0 comments on commit 77f2c8a

Please sign in to comment.