Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21314] Improve resilience against clock adjustments #5018

Merged
merged 10 commits into from
Sep 2, 2024
9 changes: 1 addition & 8 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1252,14 +1252,7 @@ bool DomainParticipantImpl::contains_entity(
ReturnCode_t DomainParticipantImpl::get_current_time(
fastdds::dds::Time_t& current_time) const
{
auto now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration);
duration -= seconds;
auto nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(duration);

current_time.seconds = static_cast<int32_t>(seconds.count());
current_time.nanosec = static_cast<uint32_t>(nanos.count());
fastdds::dds::Time_t::now(current_time);

return RETCODE_OK;
}
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ class DomainParticipantImpl
DomainParticipantListener* listener,
const std::chrono::seconds timeout = std::chrono::seconds::max())
{
auto time_out = std::chrono::time_point<std::chrono::system_clock>::max();
auto time_out = std::chrono::time_point<std::chrono::steady_clock>::max();
if (timeout < std::chrono::seconds::max())
{
auto now = std::chrono::system_clock::now();
auto now = std::chrono::steady_clock::now();
time_out = now + timeout;
}

Expand Down
34 changes: 10 additions & 24 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <fastdds/publisher/filtering/DataWriterFilteredChangePool.hpp>
#include <fastdds/publisher/PublisherImpl.hpp>
#include <fastdds/rtps/builtin/data/TopicDescription.hpp>
#include <fastdds/rtps/common/Time_t.hpp>
#include <fastdds/rtps/participant/RTPSParticipant.hpp>
#include <fastdds/rtps/RTPSDomain.hpp>
#include <fastdds/rtps/writer/RTPSWriter.hpp>
Expand Down Expand Up @@ -1076,7 +1077,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
{
if (!history_->set_next_deadline(
handle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
}
Expand Down Expand Up @@ -1547,7 +1548,7 @@ bool DataWriterImpl::deadline_missed()

if (!history_->set_next_deadline(
timer_owner_,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
return false;
Expand Down Expand Up @@ -1597,39 +1598,24 @@ bool DataWriterImpl::lifespan_expired()
{
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());

fastdds::rtps::Time_t current_ts;
fastdds::rtps::Time_t::now(current_ts);

CacheChange_t* earliest_change;
while (history_->get_earliest_change(&earliest_change))
{
auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastdds::rtps::Time_t expiration_ts = earliest_change->sourceTimestamp + qos_.lifespan().duration;

// Check that the earliest change has expired (the change which started the timer could have been removed from the history)
if (now - source_timestamp < lifespan_duration_us_)
if (current_ts < expiration_ts)
{
auto interval = source_timestamp - now + lifespan_duration_us_;
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
fastdds::rtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
return true;
}

// The earliest change has expired
history_->remove_change_pub(earliest_change);

// Set the timer for the next change if there is one
if (!history_->get_earliest_change(&earliest_change))
{
return false;
}

// Calculate when the next change is due to expire and restart
source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
now = system_clock::now();
auto interval = source_timestamp - now + lifespan_duration_us_;

if (interval.count() > 0)
{
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
return true;
}
}

return false;
Expand Down
46 changes: 16 additions & 30 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/domain/DomainParticipantImpl.hpp>
#include <fastdds/rtps/builtin/data/TopicDescription.hpp>
#include <fastdds/rtps/common/Time_t.hpp>
#include <fastdds/rtps/participant/RTPSParticipant.hpp>
#include <fastdds/rtps/reader/RTPSReader.hpp>
#include <fastdds/rtps/RTPSDomain.hpp>
Expand Down Expand Up @@ -1115,7 +1116,7 @@ bool DataReaderImpl::on_new_cache_change_added(
{
if (!history_.set_next_deadline(
change->instanceHandle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history");
}
Expand All @@ -1134,12 +1135,13 @@ bool DataReaderImpl::on_new_cache_change_added(
return true;
}

auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastdds::rtps::Time_t expiration_ts = change->sourceTimestamp + qos_.lifespan().duration;
fastdds::rtps::Time_t current_ts;
fastdds::rtps::Time_t::now(current_ts);

// The new change could have expired if it arrived too late
// If so, remove it from the history and return false to avoid notifying the listener
if (now - source_timestamp >= lifespan_duration_us_)
if (expiration_ts < current_ts)
{
history_.remove_change_sub(new_change);
return false;
Expand All @@ -1161,11 +1163,10 @@ bool DataReaderImpl::on_new_cache_change_added(
EPROSIMA_LOG_ERROR(SUBSCRIBER, "A change was added to history that could not be retrieved");
}

auto interval = source_timestamp - now + duration_cast<nanoseconds>(lifespan_duration_us_);

// Update and restart the timer
// If the timer is already running this will not have any effect
lifespan_timer_->update_interval_millisec(interval.count() * 1e-6);
fastdds::rtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
lifespan_timer_->restart_timer();
return true;
}
Expand Down Expand Up @@ -1253,7 +1254,7 @@ bool DataReaderImpl::deadline_missed()

if (!history_.set_next_deadline(
timer_owner_,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_), true))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_), true))
{
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history");
return false;
Expand Down Expand Up @@ -1284,41 +1285,26 @@ bool DataReaderImpl::lifespan_expired()
{
std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex());

fastdds::rtps::Time_t current_ts;
fastdds::rtps::Time_t::now(current_ts);

CacheChange_t* earliest_change;
while (history_.get_earliest_change(&earliest_change))
{
auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastdds::rtps::Time_t expiration_ts = earliest_change->sourceTimestamp + qos_.lifespan().duration;

// Check that the earliest change has expired (the change which started the timer could have been removed from the history)
if (now - source_timestamp < lifespan_duration_us_)
if (current_ts < expiration_ts)
{
auto interval = source_timestamp - now + lifespan_duration_us_;
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
fastdds::rtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
return true;
}

// The earliest change has expired
history_.remove_change_sub(earliest_change);

try_notify_read_conditions();

// Set the timer for the next change if there is one
if (!history_.get_earliest_change(&earliest_change))
{
return false;
}

// Calculate when the next change is due to expire and restart
source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
now = system_clock::now();
auto interval = source_timestamp - now + lifespan_duration_us_;

if (interval.count() > 0)
{
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
return true;
}
}

return false;
Expand Down
14 changes: 7 additions & 7 deletions src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class SharedMemGlobal

struct PortNode
{
alignas(8) std::atomic<std::chrono::high_resolution_clock::rep> last_listeners_status_check_time_ms;
alignas(8) std::atomic<std::chrono::steady_clock::rep> last_listeners_status_check_time_ms;
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
alignas(8) std::atomic<uint32_t> ref_counter;

SharedMemSegment::Offset buffer;
Expand Down Expand Up @@ -324,17 +324,17 @@ class SharedMemGlobal

port_node->last_listeners_status_check_time_ms.exchange(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch()).count());
std::chrono::steady_clock::now().time_since_epoch()).count());

return listeners_found == port_node->num_listeners;
}

void run()
{
auto now = std::chrono::high_resolution_clock::now();
auto now = std::chrono::steady_clock::now();

auto timeout_elapsed = [](
std::chrono::high_resolution_clock::time_point& now,
std::chrono::steady_clock::time_point& now,
const PortContext& port_context)
{
return std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count()
Expand Down Expand Up @@ -869,14 +869,14 @@ class SharedMemGlobal
throw std::runtime_error("port is marked as not ok");
}

auto t0 = std::chrono::high_resolution_clock::now();
auto t0 = std::chrono::steady_clock::now();

// If in any moment during the timeout all waiting listeners are OK
// then the port is OK
bool is_check_ok = false;
while ( !is_check_ok &&
std::chrono::duration_cast<std::chrono::milliseconds>
(std::chrono::high_resolution_clock::now() - t0).count() < node_->healthy_check_timeout_ms)
(std::chrono::steady_clock::now() - t0).count() < node_->healthy_check_timeout_ms)
{
{
std::unique_lock<SharedMemSegment::mutex> lock(node_->empty_cv_mutex);
Expand Down Expand Up @@ -1279,7 +1279,7 @@ class SharedMemGlobal
port_node->healthy_check_timeout_ms = healthy_check_timeout_ms;
port_node->last_listeners_status_check_time_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch()).count();
std::chrono::steady_clock::now().time_since_epoch()).count();
port_node->port_wait_timeout_ms = healthy_check_timeout_ms / 3;
port_node->max_buffer_descriptors = max_buffer_descriptors;
std::fill_n(port_node->listeners_status, PortNode::LISTENERS_STATUS_SIZE, PortNode::ListenerStatus());
Expand Down
48 changes: 24 additions & 24 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ StatefulWriter::StatefulWriter(
, may_remove_change_(0)
, disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback)
, disable_positive_acks_(att.disable_positive_acks)
, keep_duration_us_(att.keep_duration.to_ns() * 1e-3)
, keep_duration_(att.keep_duration)
, last_sequence_number_()
, biggest_removed_sequence_number_()
, sendBufferSize_(pimpl->get_min_network_send_buffer_size())
Expand Down Expand Up @@ -370,12 +370,13 @@ void StatefulWriter::unsent_change_added_to_history(

if (disable_positive_acks_)
{
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);
Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
Time_t current_ts;
Time_t::now(current_ts);
assert(expiration_ts >= current_ts);
auto interval = (expiration_ts - current_ts).to_duration_t();

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->update_interval(interval);
ack_event_->restart_timer(max_blocking_time);
}

Expand Down Expand Up @@ -890,12 +891,13 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
if ( !(ack_event_->getRemainingTimeMilliSec() > 0))
{
// Restart ack_timer
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);
Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
Time_t current_ts;
Time_t::now(current_ts);
assert(expiration_ts >= current_ts);
auto interval = (expiration_ts - current_ts).to_duration_t();

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->update_interval(interval);
ack_event_->restart_timer(max_blocking_time);
}
}
Expand Down Expand Up @@ -1606,13 +1608,9 @@ void StatefulWriter::update_positive_acks_times(
const WriterAttributes& att)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3))
{
// Implicit conversion to microseconds
keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()};
}
keep_duration_ = att.keep_duration;
// Restart ack timer with new duration
ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3);
ack_event_->update_interval(keep_duration_);
ack_event_->restart_timer();
}

Expand Down Expand Up @@ -2025,14 +2023,16 @@ bool StatefulWriter::ack_timer_expired()
// The timer has expired so the earliest non-acked change must be marked as acknowledged
// This will be done in the first while iteration, as we start with a negative interval

auto interval = -keep_duration_us_;
Time_t expiration_ts;
Time_t current_ts;
Time_t::now(current_ts);

// On the other hand, we've seen in the tests that if samples are sent very quickly with little
// time between consecutive samples, the timer interval could end up being negative
// In this case, we keep marking changes as acknowledged until the timer is able to keep up, hence the while
// loop

while (interval.count() < 0)
do
{
bool acks_flag = false;
for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
Expand Down Expand Up @@ -2071,13 +2071,13 @@ bool StatefulWriter::ack_timer_expired()
return false;
}

auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
interval = source_timestamp - now + keep_duration_us_;
Time_t::now(current_ts);
expiration_ts = change->sourceTimestamp + keep_duration_;
}
assert(interval.count() >= 0);
while (expiration_ts < current_ts);

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
auto interval = (expiration_ts - current_ts).to_duration_t();
ack_event_->update_interval(interval);
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/writer/StatefulWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ class StatefulWriter : public BaseWriter
bool disable_heartbeat_piggyback_;
/// True to disable positive ACKs
bool disable_positive_acks_;
/// Keep duration for disable positive ACKs QoS, in microseconds
std::chrono::duration<double, std::ratio<1, 1000000>> keep_duration_us_;
/// Keep duration for disable positive ACKs QoS
fastdds::dds::Duration_t keep_duration_;
/// Last acknowledged cache change (only used if using disable positive ACKs QoS)
SequenceNumber_t last_sequence_number_;
/// Biggest sequence number removed from history
Expand Down
Loading