From 2bf2a29e793872de47375457a256c68435d9a42c Mon Sep 17 00:00:00 2001 From: Matthias Schneider Date: Mon, 2 Sep 2024 16:17:49 +0200 Subject: [PATCH] Improve resilience against clock adjustments (#5018) * Use steady_clock instead of high_resolution_clock for status checks (high_resolution_clock might not be steady depending on STL impl) Signed-off-by: Matthias Schneider * Use steady_clock instead for system_clock for calculating timeouts Signed-off-by: Matthias Schneider * Use correct clock's duration for duration_cast Signed-off-by: Matthias Schneider * Use Time_t::now() Signed-off-by: Matthias Schneider * Fix build. Signed-off-by: Miguel Company * Refs #21314. Refactor on DataWriterImpl. Signed-off-by: Miguel Company * Refs #21314. Refactor on DataReaderImpl. Signed-off-by: Miguel Company * Refs #21314. Refactor on StatefulWriter. Signed-off-by: Miguel Company * Refs #21314. Protect current_time_since_unix_epoch against clock adjustments. Signed-off-by: Miguel Company * Revert "Use steady_clock instead of high_resolution_clock for status checks (high_resolution_clock might not be steady depending on STL impl)" This reverts commit d69eb91ec60e5a669e4ba69874c4be4355e0df0c. --------- Signed-off-by: Matthias Schneider Signed-off-by: Miguel Company Co-authored-by: Miguel Company (cherry picked from commit ccc690c97728a6414dbcdb757e9fdb35696abfbd) # Conflicts: # include/fastdds/rtps/writer/StatefulWriter.h # src/cpp/fastdds/publisher/DataWriterImpl.cpp # src/cpp/fastdds/subscriber/DataReaderImpl.cpp # src/cpp/utils/time_t_helpers.hpp --- include/fastdds/rtps/writer/StatefulWriter.h | 6 ++ .../fastdds/domain/DomainParticipantImpl.cpp | 9 +-- .../fastdds/domain/DomainParticipantImpl.hpp | 4 +- src/cpp/fastdds/publisher/DataWriterImpl.cpp | 29 ++++++--- src/cpp/fastdds/subscriber/DataReaderImpl.cpp | 54 ++++++++--------- src/cpp/rtps/writer/StatefulWriter.cpp | 48 +++++++-------- src/cpp/utils/SystemInfo.cpp | 8 +-- src/cpp/utils/time_t_helpers.hpp | 59 +++++++++++++++++++ 8 files changed, 142 insertions(+), 75 deletions(-) create mode 100644 src/cpp/utils/time_t_helpers.hpp diff --git a/include/fastdds/rtps/writer/StatefulWriter.h b/include/fastdds/rtps/writer/StatefulWriter.h index be8615bd531..58efe0db8a0 100644 --- a/include/fastdds/rtps/writer/StatefulWriter.h +++ b/include/fastdds/rtps/writer/StatefulWriter.h @@ -502,9 +502,15 @@ class StatefulWriter : public RTPSWriter bool disable_heartbeat_piggyback_; //! True to disable positive ACKs bool disable_positive_acks_; +<<<<<<< HEAD:include/fastdds/rtps/writer/StatefulWriter.h //! Keep duration for disable positive ACKs QoS, in microseconds std::chrono::duration> keep_duration_us_; //! Last acknowledged cache change (only used if using disable positive ACKs QoS) +======= + /// Keep duration for disable positive ACKs QoS + fastdds::dds::Duration_t keep_duration_; + /// Last acknowledged cache change (only used if using disable positive ACKs QoS) +>>>>>>> ccc690c97 (Improve resilience against clock adjustments (#5018)):src/cpp/rtps/writer/StatefulWriter.hpp SequenceNumber_t last_sequence_number_; //! Biggest sequence number removed from history SequenceNumber_t biggest_removed_sequence_number_; diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp index 7be131a0222..21b00217119 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp @@ -1221,14 +1221,7 @@ bool DomainParticipantImpl::contains_entity( ReturnCode_t DomainParticipantImpl::get_current_time( fastrtps::Time_t& current_time) const { - auto now = std::chrono::system_clock::now(); - auto duration = now.time_since_epoch(); - auto seconds = std::chrono::duration_cast(duration); - duration -= seconds; - auto nanos = std::chrono::duration_cast(duration); - - current_time.seconds = static_cast(seconds.count()); - current_time.nanosec = static_cast(nanos.count()); + fastdds::dds::Time_t::now(current_time); return ReturnCode_t::RETCODE_OK; } diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp index d9c5df6e5cb..65abe6b762f 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp @@ -111,10 +111,10 @@ class DomainParticipantImpl DomainParticipantListener* listener, const std::chrono::seconds timeout = std::chrono::seconds::max()) { - auto time_out = std::chrono::time_point::max(); + auto time_out = std::chrono::time_point::max(); if (timeout < std::chrono::seconds::max()) { - auto now = std::chrono::system_clock::now(); + auto now = std::chrono::steady_clock::now(); time_out = now + timeout; } diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index de657ea543d..2d02d2969fc 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -48,6 +48,15 @@ #include #include +<<<<<<< HEAD +======= +#include +#include +#include +#include +#include +#include +>>>>>>> ccc690c97 (Improve resilience against clock adjustments (#5018)) #include #include @@ -989,7 +998,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change( { if (!history_.set_next_deadline( handle, - steady_clock::now() + duration_cast(deadline_duration_us_))) + steady_clock::now() + duration_cast(deadline_duration_us_))) { EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history"); } @@ -1429,7 +1438,7 @@ bool DataWriterImpl::deadline_missed() if (!history_.set_next_deadline( timer_owner_, - steady_clock::now() + duration_cast(deadline_duration_us_))) + steady_clock::now() + duration_cast(deadline_duration_us_))) { EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history"); return false; @@ -1479,21 +1488,24 @@ bool DataWriterImpl::lifespan_expired() { std::unique_lock lock(writer_->getMutex()); + fastdds::rtps::Time_t current_ts; + fastdds::rtps::Time_t::now(current_ts); + CacheChange_t* earliest_change; while (history_.get_earliest_change(&earliest_change)) { - auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); + fastdds::rtps::Time_t expiration_ts = earliest_change->sourceTimestamp + qos_.lifespan().duration; // Check that the earliest change has expired (the change which started the timer could have been removed from the history) - if (now - source_timestamp < lifespan_duration_us_) + if (current_ts < expiration_ts) { - auto interval = source_timestamp - now + lifespan_duration_us_; - lifespan_timer_->update_interval_millisec(static_cast(duration_cast(interval).count())); + fastdds::rtps::Time_t interval = expiration_ts - current_ts; + lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6); return true; } // The earliest change has expired +<<<<<<< HEAD history_.remove_change_pub(earliest_change); // Set the timer for the next change if there is one @@ -1512,6 +1524,9 @@ bool DataWriterImpl::lifespan_expired() lifespan_timer_->update_interval_millisec(static_cast(duration_cast(interval).count())); return true; } +======= + history_->remove_change_pub(earliest_change); +>>>>>>> ccc690c97 (Improve resilience against clock adjustments (#5018)) } return false; diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index f4aeb8781f1..17b7fb86c89 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -48,8 +48,17 @@ #include #include +<<<<<<< HEAD #include +======= +#include +#include +#include +#include +#include +#include +>>>>>>> ccc690c97 (Improve resilience against clock adjustments (#5018)) #include #include @@ -1046,7 +1055,7 @@ bool DataReaderImpl::on_new_cache_change_added( { if (!history_.set_next_deadline( change->instanceHandle, - steady_clock::now() + duration_cast(deadline_duration_us_))) + steady_clock::now() + duration_cast(deadline_duration_us_))) { EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history"); } @@ -1065,12 +1074,13 @@ bool DataReaderImpl::on_new_cache_change_added( return true; } - auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); + fastdds::rtps::Time_t expiration_ts = change->sourceTimestamp + qos_.lifespan().duration; + fastdds::rtps::Time_t current_ts; + fastdds::rtps::Time_t::now(current_ts); // The new change could have expired if it arrived too late // If so, remove it from the history and return false to avoid notifying the listener - if (now - source_timestamp >= lifespan_duration_us_) + if (expiration_ts < current_ts) { history_.remove_change_sub(new_change); return false; @@ -1092,11 +1102,10 @@ bool DataReaderImpl::on_new_cache_change_added( EPROSIMA_LOG_ERROR(SUBSCRIBER, "A change was added to history that could not be retrieved"); } - auto interval = source_timestamp - now + duration_cast(lifespan_duration_us_); - // Update and restart the timer // If the timer is already running this will not have any effect - lifespan_timer_->update_interval_millisec(interval.count() * 1e-6); + fastdds::rtps::Time_t interval = expiration_ts - current_ts; + lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6); lifespan_timer_->restart_timer(); return true; } @@ -1179,7 +1188,7 @@ bool DataReaderImpl::deadline_missed() if (!history_.set_next_deadline( timer_owner_, - steady_clock::now() + duration_cast(deadline_duration_us_), true)) + steady_clock::now() + duration_cast(deadline_duration_us_), true)) { EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history"); return false; @@ -1210,17 +1219,19 @@ bool DataReaderImpl::lifespan_expired() { std::unique_lock lock(reader_->getMutex()); + fastdds::rtps::Time_t current_ts; + fastdds::rtps::Time_t::now(current_ts); + CacheChange_t* earliest_change; while (history_.get_earliest_change(&earliest_change)) { - auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); + fastdds::rtps::Time_t expiration_ts = earliest_change->sourceTimestamp + qos_.lifespan().duration; // Check that the earliest change has expired (the change which started the timer could have been removed from the history) - if (now - source_timestamp < lifespan_duration_us_) + if (current_ts < expiration_ts) { - auto interval = source_timestamp - now + lifespan_duration_us_; - lifespan_timer_->update_interval_millisec(static_cast(duration_cast(interval).count())); + fastdds::rtps::Time_t interval = expiration_ts - current_ts; + lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6); return true; } @@ -1228,23 +1239,6 @@ bool DataReaderImpl::lifespan_expired() history_.remove_change_sub(earliest_change); try_notify_read_conditions(); - - // Set the timer for the next change if there is one - if (!history_.get_earliest_change(&earliest_change)) - { - return false; - } - - // Calculate when the next change is due to expire and restart - source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns()); - now = system_clock::now(); - auto interval = source_timestamp - now + lifespan_duration_us_; - - if (interval.count() > 0) - { - lifespan_timer_->update_interval_millisec(static_cast(duration_cast(interval).count())); - return true; - } } return false; diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index f0521d9f711..b304dac85ca 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -259,7 +259,7 @@ StatefulWriter::StatefulWriter( , may_remove_change_(0) , disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback) , disable_positive_acks_(att.disable_positive_acks) - , keep_duration_us_(att.keep_duration.to_ns() * 1e-3) + , keep_duration_(att.keep_duration) , last_sequence_number_() , biggest_removed_sequence_number_() , sendBufferSize_(pimpl->get_min_network_send_buffer_size()) @@ -437,12 +437,13 @@ void StatefulWriter::unsent_change_added_to_history( if (disable_positive_acks_) { - auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); - auto interval = source_timestamp - now + keep_duration_us_; - assert(interval.count() >= 0); + Time_t expiration_ts = change->sourceTimestamp + keep_duration_; + Time_t current_ts; + Time_t::now(current_ts); + assert(expiration_ts >= current_ts); + auto interval = (expiration_ts - current_ts).to_duration_t(); - ack_event_->update_interval_millisec((double)duration_cast(interval).count()); + ack_event_->update_interval(interval); ack_event_->restart_timer(max_blocking_time); } @@ -950,12 +951,13 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network( if ( !(ack_event_->getRemainingTimeMilliSec() > 0)) { // Restart ack_timer - auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); - auto interval = source_timestamp - now + keep_duration_us_; - assert(interval.count() >= 0); + Time_t expiration_ts = change->sourceTimestamp + keep_duration_; + Time_t current_ts; + Time_t::now(current_ts); + assert(expiration_ts >= current_ts); + auto interval = (expiration_ts - current_ts).to_duration_t(); - ack_event_->update_interval_millisec((double)duration_cast(interval).count()); + ack_event_->update_interval(interval); ack_event_->restart_timer(max_blocking_time); } } @@ -1627,13 +1629,9 @@ void StatefulWriter::updatePositiveAcks( const WriterAttributes& att) { std::lock_guard guard(mp_mutex); - if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3)) - { - // Implicit conversion to microseconds - keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()}; - } + keep_duration_ = att.keep_duration; // Restart ack timer with new duration - ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3); + ack_event_->update_interval(keep_duration_); ack_event_->restart_timer(); } @@ -2044,14 +2042,16 @@ bool StatefulWriter::ack_timer_expired() // The timer has expired so the earliest non-acked change must be marked as acknowledged // This will be done in the first while iteration, as we start with a negative interval - auto interval = -keep_duration_us_; + Time_t expiration_ts; + Time_t current_ts; + Time_t::now(current_ts); // On the other hand, we've seen in the tests that if samples are sent very quickly with little // time between consecutive samples, the timer interval could end up being negative // In this case, we keep marking changes as acknowledged until the timer is able to keep up, hence the while // loop - while (interval.count() < 0) + do { bool acks_flag = false; for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, @@ -2090,13 +2090,13 @@ bool StatefulWriter::ack_timer_expired() return false; } - auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); - interval = source_timestamp - now + keep_duration_us_; + Time_t::now(current_ts); + expiration_ts = change->sourceTimestamp + keep_duration_; } - assert(interval.count() >= 0); + while (expiration_ts < current_ts); - ack_event_->update_interval_millisec((double)duration_cast(interval).count()); + auto interval = (expiration_ts - current_ts).to_duration_t(); + ack_event_->update_interval(interval); return true; } diff --git a/src/cpp/utils/SystemInfo.cpp b/src/cpp/utils/SystemInfo.cpp index 08c550438a7..5544ead54ee 100644 --- a/src/cpp/utils/SystemInfo.cpp +++ b/src/cpp/utils/SystemInfo.cpp @@ -157,7 +157,7 @@ bool SystemInfo::wait_for_file_closure( const std::string& filename, const std::chrono::seconds timeout) { - auto start = std::chrono::system_clock::now(); + auto start = std::chrono::steady_clock::now(); #ifdef _MSC_VER std::ofstream os; @@ -167,7 +167,7 @@ bool SystemInfo::wait_for_file_closure( os.open(filename, std::ios::out | std::ios::app, _SH_DENYWR); if (!os.is_open() // If the file is lock-opened in an external editor do not hang - && (std::chrono::system_clock::now() - start) < timeout ) + && (std::chrono::steady_clock::now() - start) < timeout ) { std::this_thread::yield(); } @@ -182,7 +182,7 @@ bool SystemInfo::wait_for_file_closure( while (flock(fd, LOCK_EX | LOCK_NB) // If the file is lock-opened in an external editor do not hang - && (std::chrono::system_clock::now() - start) < timeout ) + && (std::chrono::steady_clock::now() - start) < timeout ) { std::this_thread::yield(); } @@ -197,7 +197,7 @@ bool SystemInfo::wait_for_file_closure( (void)filename; #endif // ifdef _MSC_VER - return std::chrono::system_clock::now() - start < timeout; + return std::chrono::steady_clock::now() - start < timeout; } ReturnCode_t SystemInfo::set_environment_file() diff --git a/src/cpp/utils/time_t_helpers.hpp b/src/cpp/utils/time_t_helpers.hpp new file mode 100644 index 00000000000..7ee8a023eb7 --- /dev/null +++ b/src/cpp/utils/time_t_helpers.hpp @@ -0,0 +1,59 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef FASTDDS_UTILS__TIMETHELPERS_HPP +#define FASTDDS_UTILS__TIMETHELPERS_HPP + +#include +#include +// unnamed namespace for inline functions in compilation unit. Better practice than static inline. + +constexpr uint64_t C_FRACTIONS_PER_SEC = 4294967296ULL; +constexpr uint64_t C_NANOSECONDS_PER_SEC = 1000000000ULL; + +inline uint32_t frac_to_nano( + uint32_t fractions) +{ + return static_cast((fractions * C_NANOSECONDS_PER_SEC) / C_FRACTIONS_PER_SEC); +} + +inline uint32_t nano_to_frac( + uint32_t nanosecs) +{ + return static_cast((nanosecs * C_FRACTIONS_PER_SEC) / C_NANOSECONDS_PER_SEC); +} + +static void current_time_since_unix_epoch( + int32_t& secs, + uint32_t& nanosecs) +{ + using namespace std::chrono; + + static const auto init_time_since_epoch = system_clock::now().time_since_epoch(); + static const auto init_steady_time = steady_clock::now(); + + // Get time since epoch + auto t_elapsed = steady_clock::now() - init_steady_time; + auto t_since_epoch = init_time_since_epoch + t_elapsed; + // Get seconds + auto secs_t = duration_cast(t_since_epoch); + // Remove seconds from time + t_since_epoch -= secs_t; + + // Get seconds and nanoseconds + secs = static_cast(secs_t.count()); + nanosecs = static_cast(duration_cast(t_since_epoch).count()); +} + +#endif // FASTDDS_UTILS__TIMETHELPERS_HPP