diff --git a/docs/dev/low-level-info.md b/docs/dev/low-level-info.md index f7d1c12176..ce0d201b1c 100644 --- a/docs/dev/low-level-info.md +++ b/docs/dev/low-level-info.md @@ -159,8 +159,6 @@ CRcvQueue::worker_TryAsyncRend_OrStore [IF Responder] { CUDT::makeMePeerOf - [LOCKS m_GroupLock] - CUDTGroup::syncWithSocket CUDTGroup::find --> [LOCKED m_GroupLock] } debugGroup -- > [LOCKED m_GroupLock] @@ -189,8 +187,6 @@ CRcvQueue::worker_ProcessConnectionRequest [IF Responder] { CUDT::makeMePeerOf - [LOCKS m_GroupLock] - CUDTGroup::syncWithSocket CUDTGroup::find --> [LOCKED m_GroupLock] } debugGroup -- > [LOCKED m_GroupLock] diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 0e3cce0ee5..0824a0d55c 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -3151,10 +3151,10 @@ bool srt::CUDT::interpretGroup(const int32_t groupdata[], size_t data_size SRT_A log << CONID() << "HS/RSP: group $" << pg->id() << " -> peer $" << pg->peerid() << ", copying characteristic data"); - // The call to syncWithSocket is copying + // The call to syncWithFirstSocket is copying // some interesting data from the first connected // socket. This should be only done for the first successful connection. - pg->syncWithSocket(*this, HSD_INITIATOR); + pg->syncWithFirstSocket(*this, HSD_INITIATOR); } // Otherwise the peer id must be the same as existing, otherwise // this group is considered already bound to another peer group. @@ -3236,7 +3236,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3 // Check if there exists a group that this one is a peer of. CUDTGroup* gp = uglobal().findPeerGroup_LOCKED(peergroup); - bool was_empty = true; if (gp) { if (gp->type() != gtp) @@ -3248,9 +3247,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3 } HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group for peer=$" << peergroup << " found: $" << gp->id()); - - if (!gp->groupEmpty()) - was_empty = false; } else { @@ -3273,6 +3269,7 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3 gp->set_peerid(peergroup); gp->deriveSettings(this); + gp->syncWithFirstSocket(s->core(), HSD_RESPONDER); // This can only happen on a listener (it's only called on a site that is // HSD_RESPONDER), so it was a response for a groupwise connection. @@ -3284,19 +3281,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3 << gp->id()); } - { - ScopedLock glock (*gp->exp_groupLock()); - if (gp->closing()) - { - HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group $" << gp->id() << " is being closed, can't process"); - } - - if (was_empty) - { - gp->syncWithSocket(s->core(), HSD_RESPONDER); - } - } - // Setting non-blocking reading for group socket. s->core().m_config.bSynRecving = false; s->core().m_config.bSynSending = false; @@ -3396,21 +3380,15 @@ void srt::CUDT::synchronizeWithGroup(CUDTGroup* gp) // These are the values that are normally set initially by setters. int32_t snd_isn = m_iSndLastAck, rcv_isn = m_iRcvLastAck; - if (!gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn))) - { - HLOGC(gmlog.Debug, - log << CONID() << "synchronizeWithGroup: DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn - << " (shift by " << CSeqNo::seqcmp(rcv_isn, m_iRcvLastAck) << ") SND=%" << m_iSndLastAck - << " -> %" << snd_isn << " (shift by " << CSeqNo::seqcmp(snd_isn, m_iSndLastAck) << ")"); + gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn)); + HLOGC(gmlog.Debug, + log << CONID() << "synchronizeWithGroup: RCV-ISN=%" << m_iRcvLastAck << " -> %" << rcv_isn << " (shift by " + << CSeqNo::seqoff(m_iRcvLastAck, rcv_isn) << ") SND-ISN=%" << m_iSndLastAck << " -> %" << snd_isn + << " (shift by " << CSeqNo::seqoff(m_iSndLastAck, snd_isn) << ")"); + if (rcv_isn != m_iRcvLastAck) setInitialRcvSeq(rcv_isn); + if (snd_isn != m_iSndLastAck) setInitialSndSeq(snd_isn); - } - else - { - HLOGC(gmlog.Debug, - log << CONID() << "synchronizeWithGroup: DEFINED ISN: RCV=%" << m_iRcvLastAck << " SND=%" - << m_iSndLastAck); - } } #endif @@ -7686,7 +7664,12 @@ void srt::CUDT::dropToGroupRecvBase() // Note that getRcvBaseSeqNo() will lock m_GroupOf->m_GroupLock, // but this is an intended order. if (m_parent->m_GroupOf) + { group_recv_base = m_parent->m_GroupOf->getRcvBaseSeqNo(); + // To reduce overhead, especially the m_GlobControlLock, + // the group wise recv seq is updated here. + m_parent->m_GroupOf->updateRcvCurrSeqNo(m_iRcvCurrSeqNo); + } } if (group_recv_base == SRT_SEQNO_NONE) return; diff --git a/srtcore/group.cpp b/srtcore/group.cpp index f4dfba1ba4..a5d9e17eed 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -63,81 +63,19 @@ bool CUDTGroup::getBufferTimeBase(CUDT* forthesakeof, } // [[using locked(this->m_GroupLock)]]; -bool CUDTGroup::applyGroupSequences(SRTSOCKET target, int32_t& w_snd_isn, int32_t& w_rcv_isn) +void CUDTGroup::applyGroupSequences(SRTSOCKET /* not sure if needed */, int32_t& w_snd_isn, int32_t& w_rcv_isn) { - if (m_bConnected) // You are the first one, no need to change. - { - IF_HEAVY_LOGGING(string update_reason = "what?"); - // Find a socket that is declared connected and is not - // the socket that caused the call. - for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi) - { - if (gi->id == target) - continue; - - CUDT& se = gi->ps->core(); - if (!se.m_bConnected) - continue; - - // Found it. Get the following sequences: - // For sending, the sequence that is about to be sent next. - // For receiving, the sequence of the latest received packet. - - // SndCurrSeqNo is initially set to ISN-1, this next one is - // the sequence that is about to be stamped on the next sent packet - // over that socket. Using this field is safer because it is atomic - // and its affinity is to the same thread as the sending function. - - // NOTE: the groupwise scheduling sequence might have been set - // already. If so, it means that it was set by either: - // - the call of this function on the very first conencted socket (see below) - // - the call to `sendBroadcast` or `sendBackup` - // In both cases, we want THIS EXACTLY value to be reported - if (m_iLastSchedSeqNo != -1) - { - w_snd_isn = m_iLastSchedSeqNo; - IF_HEAVY_LOGGING(update_reason = "GROUPWISE snd-seq"); - } - else - { - w_snd_isn = se.m_iSndNextSeqNo; - - // Write it back to the groupwise scheduling sequence so that - // any next connected socket will take this value as well. - m_iLastSchedSeqNo = w_snd_isn; - IF_HEAVY_LOGGING(update_reason = "existing socket not yet sending"); - } - - // RcvCurrSeqNo is increased by one because it happens that at the - // synchronization moment it's already past reading and delivery. - // This is redundancy, so the redundant socket is connected at the moment - // when the other one is already transmitting, so skipping one packet - // even if later transmitted is less troublesome than requesting a - // "mistakenly seen as lost" packet. - w_rcv_isn = CSeqNo::incseq(se.m_iRcvCurrSeqNo); - - HLOGC(gmlog.Debug, - log << "applyGroupSequences: @" << target << " gets seq from @" << gi->id << " rcv %" << (w_rcv_isn) - << " snd %" << (w_snd_isn) << " as " << update_reason); - return false; - } - } - - // If the GROUP (!) is not connected, or no running/pending socket has been found. - // // That is, given socket is the first one. - // The group data should be set up with its own data. They should already be passed here - // in the variables. - // - // Override the schedule sequence of the group in this case because whatever is set now, - // it's not valid. - - HLOGC(gmlog.Debug, - log << "applyGroupSequences: no socket found connected and transmitting, @" << target - << " not changing sequences, storing snd-seq %" << (w_snd_isn)); - - set_currentSchedSequence(w_snd_isn); - - return true; + // LastSchedSeqNo is initially set to the ISN of first connected socket, + // its' also updated in group send functions to the next scheduling seq. + w_snd_isn = m_iLastSchedSeqNo; + + // RcvCurrSeqNo is increased by one because it happens that at the + // synchronization moment it's already past reading and delivery. + // This is redundancy, so the redundant socket is connected at the moment + // when the other one is already transmitting, so skipping one packet + // even if later transmitted is less troublesome than requesting a + // "mistakenly seen as lost" packet. + w_rcv_isn = CSeqNo::incseq(m_iRcvCurrSeqNo); } // NOTE: This function is now for DEBUG PURPOSES ONLY. @@ -263,7 +201,6 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_bSynSending(true) , m_bTsbPd(true) , m_bTLPktDrop(true) - , m_iTsbPdDelay_us(0) // m_*EID and m_*Epolld fields will be initialized // in the constructor body. , m_iSndTimeOut(-1) @@ -276,6 +213,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_bClosing(false) , m_iLastSchedSeqNo(SRT_SEQNO_NONE) , m_iLastSchedMsgNo(SRT_MSGNO_NONE) + , m_iRcvCurrSeqNo(SRT_SEQNO_NONE) { setupMutex(m_GroupLock, "Group"); setupMutex(m_RcvDataLock, "RcvData"); @@ -833,8 +771,7 @@ SRT_SOCKSTATUS CUDTGroup::getStatus() return SRTS_BROKEN; } -// [[using locked(m_GroupLock)]]; -void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side) +void CUDTGroup::syncWithFirstSocket(const CUDT& core, const HandshakeSide side) { if (side == HSD_RESPONDER) { @@ -845,23 +782,14 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side) set_currentSchedSequence(core.ISN()); } - // XXX - // Might need further investigation as to whether this isn't - // wrong for some cases. By having this -1 here the value will be - // laziliy set from the first reading one. It is believed that - // it covers all possible scenarios, that is: - // + m_iRcvCurrSeqNo = core.m_iRcvCurrSeqNo.load(); + + // By having this -1 here the value will be laziliy set from the first reading one. + // It is believed that it covers all possible scenarios, that is: // - no readers - no problem! // - have some readers and a new is attached - this is set already // - connect multiple links, but none has read yet - you'll be the first. - // - // Previous implementation used setting to: core.m_iPeerISN - resetInitialRxSequence(); - - // Get the latency (possibly fixed against the opposite side) - // from the first socket (core.m_iTsbPdDelay_ms), - // and set it on the current socket. - set_latency(core.m_iTsbPdDelay_ms * int64_t(1000)); + m_RcvBaseSeqNo = SRT_SEQNO_NONE; } void CUDTGroup::close() @@ -2089,6 +2017,22 @@ void CUDTGroup::updateReadState(SRTSOCKET /* not sure if needed */, int32_t sequ } } +void CUDTGroup::updateRcvCurrSeqNo(int32_t seq) +{ + ScopedLock lg(m_GroupLock); + + if (m_iRcvCurrSeqNo == SRT_SEQNO_NONE) + { + LOGC(grlog.Error, + log << "IPE: CUDTGroup::m_iRcvCurrSeqNo was not initialized by the first member, setting to %" << seq); + m_iRcvCurrSeqNo = seq; + } + else if (CSeqNo::seqcmp(seq, m_iRcvCurrSeqNo) > 0) + { + m_iRcvCurrSeqNo = seq; + } +} + int32_t CUDTGroup::getRcvBaseSeqNo() { ScopedLock lg(m_GroupLock); diff --git a/srtcore/group.h b/srtcore/group.h index 09e0722671..cd34eb8f9a 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -162,23 +162,8 @@ class CUDTGroup { m_Group.erase(f); - // Reset sequence numbers on a dead group so that they are - // initialized anew with the new alive connection within - // the group. - // XXX The problem is that this should be done after the - // socket is considered DISCONNECTED, not when it's being - // closed. After being disconnected, the sequence numbers - // are no longer valid, and will be reinitialized when the - // socket is connected again. This may stay as is for now - // as in SRT it's not predicted to do anything with the socket - // that was disconnected other than immediately closing it. if (m_Group.empty()) { - // When the group is empty, there's no danger that this - // number will collide with any ISN provided by a socket. - // Also since now every socket will derive this ISN. - m_iLastSchedSeqNo = generateISN(); - resetInitialRxSequence(); empty = true; } } @@ -346,6 +331,7 @@ class CUDTGroup /// @param sock member socket ID (unused) /// @param sequence the latest packet sequence number available for reading. void updateReadState(SRTSOCKET sock, int32_t sequence); + void updateRcvCurrSeqNo(int32_t seq); void updateWriteState(); void updateFailedLink(); @@ -372,7 +358,7 @@ class CUDTGroup /// @param ack The past-the-last-received ACK sequence number void readyPackets(srt::CUDT* core, int32_t ack); - void syncWithSocket(const srt::CUDT& core, const HandshakeSide side); + void syncWithFirstSocket(const srt::CUDT& core, const HandshakeSide side); int getGroupData(SRT_SOCKGROUPDATA* pdata, size_t* psize); int getGroupData_LOCKED(SRT_SOCKGROUPDATA* pdata, size_t* psize); @@ -630,7 +616,6 @@ class CUDTGroup bool m_bSynSending; bool m_bTsbPd; bool m_bTLPktDrop; - int64_t m_iTsbPdDelay_us; int m_RcvEID; class CEPollDesc* m_RcvEpolld; int m_SndEID; @@ -692,6 +677,7 @@ class CUDTGroup sync::Mutex m_RcvDataLock; sync::atomic m_iLastSchedSeqNo; // represetnts the value of CUDT::m_iSndNextSeqNo for each running socket sync::atomic m_iLastSchedMsgNo; + sync::atomic m_iRcvCurrSeqNo; // Represetnts the max value of CUDT::m_iRcvCurrSeqNo for all sockets in this group. // Statistics struct Stats @@ -764,15 +750,6 @@ class CUDTGroup #endif } - void resetInitialRxSequence() - { - // The app-reader doesn't care about the real sequence number. - // The first provided one will be taken as a good deal; even if - // this is going to be past the ISN, at worst it will be caused - // by TLPKTDROP. - m_RcvBaseSeqNo = SRT_SEQNO_NONE; - } - bool applyGroupTime(time_point& w_start_time, time_point& w_peer_start_time) { using srt::sync::is_zero; @@ -803,7 +780,7 @@ class CUDTGroup // Live state synchronization bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr); - bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn); + void applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn); /// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference. /// @param srcMember a reference for synchronization. @@ -817,7 +794,6 @@ class CUDTGroup SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type); SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo); SRTU_PROPERTY_RRW(std::set&, epollset, m_sPollID); - SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us); SRTU_PROPERTY_RO(bool, closing, m_bClosing); };