From 3c7022eb7786b085b9b4332addb74342c2c29ec7 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Tue, 8 Oct 2024 16:29:02 +0200 Subject: [PATCH] [core] Improved mutex protection of the TSBPD (#3038). --- srtcore/buffer_tools.cpp | 4 ++-- srtcore/tsbpd_time.cpp | 33 +++++++++++++++++++++------------ srtcore/tsbpd_time.h | 22 ++++++++++++++++++++-- 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/srtcore/buffer_tools.cpp b/srtcore/buffer_tools.cpp index 7473e3824..e809b952e 100644 --- a/srtcore/buffer_tools.cpp +++ b/srtcore/buffer_tools.cpp @@ -253,8 +253,8 @@ void CSndRateEstimator::addSample(const time_point& ts, int pkts, size_t bytes) } } - m_Samples[m_iCurSampleIdx].m_iBytesCount += bytes; - m_Samples[m_iCurSampleIdx].m_iPktsCount += pkts; + m_Samples[m_iCurSampleIdx].m_iBytesCount += (int) bytes; + m_Samples[m_iCurSampleIdx].m_iPktsCount += pkts; } int CSndRateEstimator::getCurrentRate() const diff --git a/srtcore/tsbpd_time.cpp b/srtcore/tsbpd_time.cpp index 162fc7ac7..ec0f39f7b 100644 --- a/srtcore/tsbpd_time.cpp +++ b/srtcore/tsbpd_time.cpp @@ -108,7 +108,7 @@ bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, const time_point& tsPkt if (!m_bTsbPdMode) return false; - ScopedLock lck(m_mtxRW); + ExclusiveLock lck(m_mtxRW); // Remember the first RTT sample measured. Ideally we need RTT0 - the one from the handshaking phase, // because TSBPD base is initialized there. But HS-based RTT is not yet implemented. @@ -122,7 +122,7 @@ bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, const time_point& tsPkt // is to estimate RTT change and assume that the change of the one way network delay is // approximated by the half of the RTT change. const duration tdRTTDelta = usRTTSample >= 0 ? microseconds_from((usRTTSample - m_iFirstRTT) / 2) : duration(0); - const time_point tsPktBaseTime = getPktTsbPdBaseTime(usPktTimestamp); + const time_point tsPktBaseTime = getPktTsbPdBaseTimeNoLock(usPktTimestamp); const steady_clock::duration tdDrift = tsPktArrival - tsPktBaseTime - tdRTTDelta; const bool updated = m_DriftTracer.update(count_microseconds(tdDrift)); @@ -158,6 +158,7 @@ bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, const time_point& tsPkt void CTsbpdTime::setTsbPdMode(const steady_clock::time_point& timebase, bool wrap, duration delay) { + ExclusiveLock lck(m_mtxRW); m_bTsbPdMode = true; m_bTsbPdWrapCheck = wrap; @@ -183,6 +184,7 @@ void CTsbpdTime::applyGroupTime(const steady_clock::time_point& timebase, // newly added to the group must get EXACTLY the same internal timebase // or otherwise the TsbPd time calculation will ship different results // on different member sockets. + ExclusiveLock lck(m_mtxRW); m_bTsbPdMode = true; @@ -196,6 +198,7 @@ void CTsbpdTime::applyGroupDrift(const steady_clock::time_point& timebase, bool wrp, const steady_clock::duration& udrift) { + ExclusiveLock lck(m_mtxRW); // This is only when a drift was updated on one of the group members. HLOGC(brlog.Debug, log << "rcv-buffer: group synch uDRIFT: " << m_DriftTracer.drift() << " -> " << FormatDuration(udrift) @@ -207,7 +210,7 @@ void CTsbpdTime::applyGroupDrift(const steady_clock::time_point& timebase, m_DriftTracer.forceDrift(count_microseconds(udrift)); } -CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBase(uint32_t timestamp_us) const +CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBaseNoLock(uint32_t timestamp_us) const { // A data packet within [TSBPD_WRAP_PERIOD; 2 * TSBPD_WRAP_PERIOD] would end TSBPD wrap-aware state. // Some incoming control packets may not update the TSBPD base (calling updateTsbPdTimeBase(..)), @@ -218,20 +221,25 @@ CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBase(uint32_t timestamp_us) const return (m_tsTsbPdTimeBase + microseconds_from(carryover_us)); } -CTsbpdTime::time_point CTsbpdTime::getPktTsbPdTime(uint32_t usPktTimestamp) const +CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBase(uint32_t timestamp_us) const { - time_point value = getPktTsbPdBaseTime(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift()); + SharedLock lck(m_mtxRW); + return getTsbPdTimeBaseNoLock(timestamp_us); +} - /* - HLOGC(brlog.Debug, log << "getPktTsbPdTime:" - << " BASE=" << FormatTime(m_tsTsbPdTimeBase) - << " TS=" << usPktTimestamp << "us, lat=" << FormatDuration(m_tdTsbPdDelay) - << " DRF=" << m_DriftTracer.drift() << "us = " << FormatTime(value)); - */ +CTsbpdTime::time_point CTsbpdTime::getPktTsbPdTime(uint32_t usPktTimestamp) const +{ + SharedLock lck(m_mtxRW); + time_point value = getPktTsbPdBaseTimeNoLock(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift()); return value; } +CTsbpdTime::time_point CTsbpdTime::getPktTsbPdBaseTimeNoLock(uint32_t usPktTimestamp) const +{ + return getTsbPdTimeBaseNoLock(usPktTimestamp) + microseconds_from(usPktTimestamp); +} + CTsbpdTime::time_point CTsbpdTime::getPktTsbPdBaseTime(uint32_t usPktTimestamp) const { return getTsbPdTimeBase(usPktTimestamp) + microseconds_from(usPktTimestamp); @@ -239,6 +247,7 @@ CTsbpdTime::time_point CTsbpdTime::getPktTsbPdBaseTime(uint32_t usPktTimestamp) void CTsbpdTime::updateTsbPdTimeBase(uint32_t usPktTimestamp) { + ExclusiveLock lck(m_mtxRW); if (m_bTsbPdWrapCheck) { // Wrap check period. @@ -267,7 +276,7 @@ void CTsbpdTime::updateTsbPdTimeBase(uint32_t usPktTimestamp) void CTsbpdTime::getInternalTimeBase(time_point& w_tb, bool& w_wrp, duration& w_udrift) const { - ScopedLock lck(m_mtxRW); + ExclusiveLock lck(m_mtxRW); w_tb = m_tsTsbPdTimeBase; w_udrift = microseconds_from(m_DriftTracer.drift()); w_wrp = m_bTsbPdWrapCheck; diff --git a/srtcore/tsbpd_time.h b/srtcore/tsbpd_time.h index 3483c197f..78cf18d6f 100644 --- a/srtcore/tsbpd_time.h +++ b/srtcore/tsbpd_time.h @@ -26,7 +26,7 @@ class CTsbpdTime typedef srt::sync::steady_clock steady_clock; typedef steady_clock::time_point time_point; typedef steady_clock::duration duration; - typedef srt::sync::Mutex Mutex; + typedef srt::sync::SharedMutex SharedMutex; public: CTsbpdTime() @@ -117,6 +117,24 @@ class CTsbpdTime void getInternalTimeBase(time_point& w_tb, bool& w_wrp, duration& w_udrift) const; private: + /// @brief Get TSBPD base time adjusted for carryover, which occurs when + /// a packet's timestamp exceeds the UINT32_MAX and continues from zero. + /// Does not lock the internal state. + /// @param [in] usPktTimestamp 32-bit value of packet timestamp field (microseconds). + /// + /// @return TSBPD base time for a provided packet timestamp. + time_point getTsbPdTimeBaseNoLock(uint32_t usPktTimestamp) const; + + /// @brief Get packet TSBPD time without buffering delay and clock drift, which is + /// the target time for delivering the packet to an upstream application. + /// Essentially: getTsbPdTimeBase(usPktTimestamp) + usPktTimestamp + /// Does not lock the internal state. + /// @param [in] usPktTimestamp 32-bit value of packet timestamp field (microseconds). + /// + /// @return Packet TSBPD base time without buffering delay. + time_point getPktTsbPdBaseTimeNoLock(uint32_t usPktTimestamp) const; + + int m_iFirstRTT; // First measured RTT sample. bool m_bTsbPdMode; // Receiver buffering and TSBPD is active when true. duration m_tdTsbPdDelay; // Negotiated buffering delay. @@ -155,7 +173,7 @@ class CTsbpdTime DriftTracer m_DriftTracer; /// Protect simultaneous change of state (read/write). - mutable Mutex m_mtxRW; + mutable SharedMutex m_mtxRW; }; } // namespace srt