From 774212fa1852955d1b266ca08d4781c49ca6578f Mon Sep 17 00:00:00 2001 From: Guangqing Chen Date: Wed, 10 May 2023 11:48:12 +0800 Subject: [PATCH] [core] fix applyGroupSequences --- srtcore/core.cpp | 41 +++++------------ srtcore/group.cpp | 111 +++++++++++++--------------------------------- srtcore/group.h | 12 ++--- 3 files changed, 46 insertions(+), 118 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 0e3cce0ee5..465c5c1f75 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -3236,7 +3236,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3 // Check if there exists a group that this one is a peer of. CUDTGroup* gp = uglobal().findPeerGroup_LOCKED(peergroup); - bool was_empty = true; if (gp) { if (gp->type() != gtp) @@ -3248,9 +3247,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3 } HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group for peer=$" << peergroup << " found: $" << gp->id()); - - if (!gp->groupEmpty()) - was_empty = false; } else { @@ -3273,6 +3269,7 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3 gp->set_peerid(peergroup); gp->deriveSettings(this); + gp->syncWithSocket(s->core(), HSD_RESPONDER); // This can only happen on a listener (it's only called on a site that is // HSD_RESPONDER), so it was a response for a groupwise connection. @@ -3284,19 +3281,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3 << gp->id()); } - { - ScopedLock glock (*gp->exp_groupLock()); - if (gp->closing()) - { - HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group $" << gp->id() << " is being closed, can't process"); - } - - if (was_empty) - { - gp->syncWithSocket(s->core(), HSD_RESPONDER); - } - } - // Setting non-blocking reading for group socket. s->core().m_config.bSynRecving = false; s->core().m_config.bSynSending = false; @@ -3396,21 +3380,15 @@ void srt::CUDT::synchronizeWithGroup(CUDTGroup* gp) // These are the values that are normally set initially by setters. int32_t snd_isn = m_iSndLastAck, rcv_isn = m_iRcvLastAck; - if (!gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn))) - { - HLOGC(gmlog.Debug, - log << CONID() << "synchronizeWithGroup: DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn - << " (shift by " << CSeqNo::seqcmp(rcv_isn, m_iRcvLastAck) << ") SND=%" << m_iSndLastAck - << " -> %" << snd_isn << " (shift by " << CSeqNo::seqcmp(snd_isn, m_iSndLastAck) << ")"); + gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn)); + HLOGC(gmlog.Debug, + log << CONID() << "synchronizeWithGroup: DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn + << " (shift by " << CSeqNo::seqoff(m_iRcvLastAck, rcv_isn) << ") SND=%" << m_iSndLastAck + << " -> %" << snd_isn << " (shift by " << CSeqNo::seqoff(m_iSndLastAck, snd_isn) << ")"); + if (rcv_isn != m_iRcvLastAck) setInitialRcvSeq(rcv_isn); + if (snd_isn != m_iSndLastAck) setInitialSndSeq(snd_isn); - } - else - { - HLOGC(gmlog.Debug, - log << CONID() << "synchronizeWithGroup: DEFINED ISN: RCV=%" << m_iRcvLastAck << " SND=%" - << m_iSndLastAck); - } } #endif @@ -7686,7 +7664,10 @@ void srt::CUDT::dropToGroupRecvBase() // Note that getRcvBaseSeqNo() will lock m_GroupOf->m_GroupLock, // but this is an intended order. if (m_parent->m_GroupOf) + { group_recv_base = m_parent->m_GroupOf->getRcvBaseSeqNo(); + m_parent->m_GroupOf->updateRcvCurrSeqNo(m_iRcvCurrSeqNo); + } } if (group_recv_base == SRT_SEQNO_NONE) return; diff --git a/srtcore/group.cpp b/srtcore/group.cpp index f4dfba1ba4..f2f04449c5 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -63,81 +63,19 @@ bool CUDTGroup::getBufferTimeBase(CUDT* forthesakeof, } // [[using locked(this->m_GroupLock)]]; -bool CUDTGroup::applyGroupSequences(SRTSOCKET target, int32_t& w_snd_isn, int32_t& w_rcv_isn) +void CUDTGroup::applyGroupSequences(SRTSOCKET /* not sure if needed */, int32_t& w_snd_isn, int32_t& w_rcv_isn) { - if (m_bConnected) // You are the first one, no need to change. - { - IF_HEAVY_LOGGING(string update_reason = "what?"); - // Find a socket that is declared connected and is not - // the socket that caused the call. - for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi) - { - if (gi->id == target) - continue; - - CUDT& se = gi->ps->core(); - if (!se.m_bConnected) - continue; - - // Found it. Get the following sequences: - // For sending, the sequence that is about to be sent next. - // For receiving, the sequence of the latest received packet. - - // SndCurrSeqNo is initially set to ISN-1, this next one is - // the sequence that is about to be stamped on the next sent packet - // over that socket. Using this field is safer because it is atomic - // and its affinity is to the same thread as the sending function. - - // NOTE: the groupwise scheduling sequence might have been set - // already. If so, it means that it was set by either: - // - the call of this function on the very first conencted socket (see below) - // - the call to `sendBroadcast` or `sendBackup` - // In both cases, we want THIS EXACTLY value to be reported - if (m_iLastSchedSeqNo != -1) - { - w_snd_isn = m_iLastSchedSeqNo; - IF_HEAVY_LOGGING(update_reason = "GROUPWISE snd-seq"); - } - else - { - w_snd_isn = se.m_iSndNextSeqNo; - - // Write it back to the groupwise scheduling sequence so that - // any next connected socket will take this value as well. - m_iLastSchedSeqNo = w_snd_isn; - IF_HEAVY_LOGGING(update_reason = "existing socket not yet sending"); - } - - // RcvCurrSeqNo is increased by one because it happens that at the - // synchronization moment it's already past reading and delivery. - // This is redundancy, so the redundant socket is connected at the moment - // when the other one is already transmitting, so skipping one packet - // even if later transmitted is less troublesome than requesting a - // "mistakenly seen as lost" packet. - w_rcv_isn = CSeqNo::incseq(se.m_iRcvCurrSeqNo); - - HLOGC(gmlog.Debug, - log << "applyGroupSequences: @" << target << " gets seq from @" << gi->id << " rcv %" << (w_rcv_isn) - << " snd %" << (w_snd_isn) << " as " << update_reason); - return false; - } - } - - // If the GROUP (!) is not connected, or no running/pending socket has been found. - // // That is, given socket is the first one. - // The group data should be set up with its own data. They should already be passed here - // in the variables. - // - // Override the schedule sequence of the group in this case because whatever is set now, - // it's not valid. - - HLOGC(gmlog.Debug, - log << "applyGroupSequences: no socket found connected and transmitting, @" << target - << " not changing sequences, storing snd-seq %" << (w_snd_isn)); - - set_currentSchedSequence(w_snd_isn); - - return true; + // LastSchedSeqNo is initially set to the ISN of first connected socket, + // its' also updated in group send functions to the next scheduling seq. + w_snd_isn = m_iLastSchedSeqNo; + + // RcvCurrSeqNo is increased by one because it happens that at the + // synchronization moment it's already past reading and delivery. + // This is redundancy, so the redundant socket is connected at the moment + // when the other one is already transmitting, so skipping one packet + // even if later transmitted is less troublesome than requesting a + // "mistakenly seen as lost" packet. + w_rcv_isn = CSeqNo::incseq(m_iRcvCurrSeqNo); } // NOTE: This function is now for DEBUG PURPOSES ONLY. @@ -263,7 +201,6 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_bSynSending(true) , m_bTsbPd(true) , m_bTLPktDrop(true) - , m_iTsbPdDelay_us(0) // m_*EID and m_*Epolld fields will be initialized // in the constructor body. , m_iSndTimeOut(-1) @@ -276,6 +213,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_bClosing(false) , m_iLastSchedSeqNo(SRT_SEQNO_NONE) , m_iLastSchedMsgNo(SRT_MSGNO_NONE) + , m_iRcvCurrSeqNo(SRT_SEQNO_NONE) { setupMutex(m_GroupLock, "Group"); setupMutex(m_RcvDataLock, "RcvData"); @@ -845,6 +783,8 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side) set_currentSchedSequence(core.ISN()); } + set_currentRecvSequence(core.m_iRcvCurrSeqNo); + // XXX // Might need further investigation as to whether this isn't // wrong for some cases. By having this -1 here the value will be @@ -857,11 +797,6 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side) // // Previous implementation used setting to: core.m_iPeerISN resetInitialRxSequence(); - - // Get the latency (possibly fixed against the opposite side) - // from the first socket (core.m_iTsbPdDelay_ms), - // and set it on the current socket. - set_latency(core.m_iTsbPdDelay_ms * int64_t(1000)); } void CUDTGroup::close() @@ -2089,6 +2024,22 @@ void CUDTGroup::updateReadState(SRTSOCKET /* not sure if needed */, int32_t sequ } } +void CUDTGroup::updateRcvCurrSeqNo(int32_t seq) +{ + ScopedLock lg(m_GroupLock); + + if (m_iRcvCurrSeqNo == SRT_SEQNO_NONE) + { + LOGC(grlog.Error, + log << "IPE: CUDTGroup::m_iRcvCurrSeqNo was not initialized by the first member, setting to %" << seq); + m_iRcvCurrSeqNo = seq; + } + else if (CSeqNo::seqcmp(seq, m_iRcvCurrSeqNo) > 0) + { + m_iRcvCurrSeqNo = seq; + } +} + int32_t CUDTGroup::getRcvBaseSeqNo() { ScopedLock lg(m_GroupLock); diff --git a/srtcore/group.h b/srtcore/group.h index 09e0722671..61a47c8a91 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -174,11 +174,6 @@ class CUDTGroup // that was disconnected other than immediately closing it. if (m_Group.empty()) { - // When the group is empty, there's no danger that this - // number will collide with any ISN provided by a socket. - // Also since now every socket will derive this ISN. - m_iLastSchedSeqNo = generateISN(); - resetInitialRxSequence(); empty = true; } } @@ -346,6 +341,7 @@ class CUDTGroup /// @param sock member socket ID (unused) /// @param sequence the latest packet sequence number available for reading. void updateReadState(SRTSOCKET sock, int32_t sequence); + void updateRcvCurrSeqNo(int32_t seq); void updateWriteState(); void updateFailedLink(); @@ -630,7 +626,6 @@ class CUDTGroup bool m_bSynSending; bool m_bTsbPd; bool m_bTLPktDrop; - int64_t m_iTsbPdDelay_us; int m_RcvEID; class CEPollDesc* m_RcvEpolld; int m_SndEID; @@ -692,6 +687,7 @@ class CUDTGroup sync::Mutex m_RcvDataLock; sync::atomic m_iLastSchedSeqNo; // represetnts the value of CUDT::m_iSndNextSeqNo for each running socket sync::atomic m_iLastSchedMsgNo; + sync::atomic m_iRcvCurrSeqNo; // Represetnts the max value of CUDT::m_iRcvCurrSeqNo for all sockets in this group. // Statistics struct Stats @@ -803,7 +799,7 @@ class CUDTGroup // Live state synchronization bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr); - bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn); + void applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn); /// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference. /// @param srcMember a reference for synchronization. @@ -816,8 +812,8 @@ class CUDTGroup SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, peerid, m_PeerGroupID); SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type); SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo); + SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentRecvSequence, m_iRcvCurrSeqNo); SRTU_PROPERTY_RRW(std::set&, epollset, m_sPollID); - SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us); SRTU_PROPERTY_RO(bool, closing, m_bClosing); };