Skip to content

Commit

Permalink
[core] fix applyGroupSequences
Browse files Browse the repository at this point in the history
  • Loading branch information
gou4shi1 committed May 11, 2023
1 parent 3cefede commit 774212f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 118 deletions.
41 changes: 11 additions & 30 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3236,7 +3236,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3

// Check if there exists a group that this one is a peer of.
CUDTGroup* gp = uglobal().findPeerGroup_LOCKED(peergroup);
bool was_empty = true;
if (gp)
{
if (gp->type() != gtp)
Expand All @@ -3248,9 +3247,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3
}

HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group for peer=$" << peergroup << " found: $" << gp->id());

if (!gp->groupEmpty())
was_empty = false;
}
else
{
Expand All @@ -3273,6 +3269,7 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3

gp->set_peerid(peergroup);
gp->deriveSettings(this);
gp->syncWithSocket(s->core(), HSD_RESPONDER);

// This can only happen on a listener (it's only called on a site that is
// HSD_RESPONDER), so it was a response for a groupwise connection.
Expand All @@ -3284,19 +3281,6 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3
<< gp->id());
}

{
ScopedLock glock (*gp->exp_groupLock());
if (gp->closing())
{
HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: group $" << gp->id() << " is being closed, can't process");
}

if (was_empty)
{
gp->syncWithSocket(s->core(), HSD_RESPONDER);
}
}

// Setting non-blocking reading for group socket.
s->core().m_config.bSynRecving = false;
s->core().m_config.bSynSending = false;
Expand Down Expand Up @@ -3396,21 +3380,15 @@ void srt::CUDT::synchronizeWithGroup(CUDTGroup* gp)

// These are the values that are normally set initially by setters.
int32_t snd_isn = m_iSndLastAck, rcv_isn = m_iRcvLastAck;
if (!gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn)))
{
HLOGC(gmlog.Debug,
log << CONID() << "synchronizeWithGroup: DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn
<< " (shift by " << CSeqNo::seqcmp(rcv_isn, m_iRcvLastAck) << ") SND=%" << m_iSndLastAck
<< " -> %" << snd_isn << " (shift by " << CSeqNo::seqcmp(snd_isn, m_iSndLastAck) << ")");
gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn));
HLOGC(gmlog.Debug,
log << CONID() << "synchronizeWithGroup: DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn
<< " (shift by " << CSeqNo::seqoff(m_iRcvLastAck, rcv_isn) << ") SND=%" << m_iSndLastAck
<< " -> %" << snd_isn << " (shift by " << CSeqNo::seqoff(m_iSndLastAck, snd_isn) << ")");
if (rcv_isn != m_iRcvLastAck)
setInitialRcvSeq(rcv_isn);
if (snd_isn != m_iSndLastAck)
setInitialSndSeq(snd_isn);
}
else
{
HLOGC(gmlog.Debug,
log << CONID() << "synchronizeWithGroup: DEFINED ISN: RCV=%" << m_iRcvLastAck << " SND=%"
<< m_iSndLastAck);
}
}
#endif

Expand Down Expand Up @@ -7686,7 +7664,10 @@ void srt::CUDT::dropToGroupRecvBase()
// Note that getRcvBaseSeqNo() will lock m_GroupOf->m_GroupLock,
// but this is an intended order.
if (m_parent->m_GroupOf)
{
group_recv_base = m_parent->m_GroupOf->getRcvBaseSeqNo();
m_parent->m_GroupOf->updateRcvCurrSeqNo(m_iRcvCurrSeqNo);
}
}
if (group_recv_base == SRT_SEQNO_NONE)
return;
Expand Down
111 changes: 31 additions & 80 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,81 +63,19 @@ bool CUDTGroup::getBufferTimeBase(CUDT* forthesakeof,
}

// [[using locked(this->m_GroupLock)]];
bool CUDTGroup::applyGroupSequences(SRTSOCKET target, int32_t& w_snd_isn, int32_t& w_rcv_isn)
void CUDTGroup::applyGroupSequences(SRTSOCKET /* not sure if needed */, int32_t& w_snd_isn, int32_t& w_rcv_isn)
{
if (m_bConnected) // You are the first one, no need to change.
{
IF_HEAVY_LOGGING(string update_reason = "what?");
// Find a socket that is declared connected and is not
// the socket that caused the call.
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
if (gi->id == target)
continue;

CUDT& se = gi->ps->core();
if (!se.m_bConnected)
continue;

// Found it. Get the following sequences:
// For sending, the sequence that is about to be sent next.
// For receiving, the sequence of the latest received packet.

// SndCurrSeqNo is initially set to ISN-1, this next one is
// the sequence that is about to be stamped on the next sent packet
// over that socket. Using this field is safer because it is atomic
// and its affinity is to the same thread as the sending function.

// NOTE: the groupwise scheduling sequence might have been set
// already. If so, it means that it was set by either:
// - the call of this function on the very first conencted socket (see below)
// - the call to `sendBroadcast` or `sendBackup`
// In both cases, we want THIS EXACTLY value to be reported
if (m_iLastSchedSeqNo != -1)
{
w_snd_isn = m_iLastSchedSeqNo;
IF_HEAVY_LOGGING(update_reason = "GROUPWISE snd-seq");
}
else
{
w_snd_isn = se.m_iSndNextSeqNo;

// Write it back to the groupwise scheduling sequence so that
// any next connected socket will take this value as well.
m_iLastSchedSeqNo = w_snd_isn;
IF_HEAVY_LOGGING(update_reason = "existing socket not yet sending");
}

// RcvCurrSeqNo is increased by one because it happens that at the
// synchronization moment it's already past reading and delivery.
// This is redundancy, so the redundant socket is connected at the moment
// when the other one is already transmitting, so skipping one packet
// even if later transmitted is less troublesome than requesting a
// "mistakenly seen as lost" packet.
w_rcv_isn = CSeqNo::incseq(se.m_iRcvCurrSeqNo);

HLOGC(gmlog.Debug,
log << "applyGroupSequences: @" << target << " gets seq from @" << gi->id << " rcv %" << (w_rcv_isn)
<< " snd %" << (w_snd_isn) << " as " << update_reason);
return false;
}
}

// If the GROUP (!) is not connected, or no running/pending socket has been found.
// // That is, given socket is the first one.
// The group data should be set up with its own data. They should already be passed here
// in the variables.
//
// Override the schedule sequence of the group in this case because whatever is set now,
// it's not valid.

HLOGC(gmlog.Debug,
log << "applyGroupSequences: no socket found connected and transmitting, @" << target
<< " not changing sequences, storing snd-seq %" << (w_snd_isn));

set_currentSchedSequence(w_snd_isn);

return true;
// LastSchedSeqNo is initially set to the ISN of first connected socket,
// its' also updated in group send functions to the next scheduling seq.
w_snd_isn = m_iLastSchedSeqNo;

// RcvCurrSeqNo is increased by one because it happens that at the
// synchronization moment it's already past reading and delivery.
// This is redundancy, so the redundant socket is connected at the moment
// when the other one is already transmitting, so skipping one packet
// even if later transmitted is less troublesome than requesting a
// "mistakenly seen as lost" packet.
w_rcv_isn = CSeqNo::incseq(m_iRcvCurrSeqNo);
}

// NOTE: This function is now for DEBUG PURPOSES ONLY.
Expand Down Expand Up @@ -263,7 +201,6 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
, m_bSynSending(true)
, m_bTsbPd(true)
, m_bTLPktDrop(true)
, m_iTsbPdDelay_us(0)
// m_*EID and m_*Epolld fields will be initialized
// in the constructor body.
, m_iSndTimeOut(-1)
Expand All @@ -276,6 +213,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
, m_bClosing(false)
, m_iLastSchedSeqNo(SRT_SEQNO_NONE)
, m_iLastSchedMsgNo(SRT_MSGNO_NONE)
, m_iRcvCurrSeqNo(SRT_SEQNO_NONE)
{
setupMutex(m_GroupLock, "Group");
setupMutex(m_RcvDataLock, "RcvData");
Expand Down Expand Up @@ -845,6 +783,8 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side)
set_currentSchedSequence(core.ISN());
}

set_currentRecvSequence(core.m_iRcvCurrSeqNo);

// XXX
// Might need further investigation as to whether this isn't
// wrong for some cases. By having this -1 here the value will be
Expand All @@ -857,11 +797,6 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side)
//
// Previous implementation used setting to: core.m_iPeerISN
resetInitialRxSequence();

// Get the latency (possibly fixed against the opposite side)
// from the first socket (core.m_iTsbPdDelay_ms),
// and set it on the current socket.
set_latency(core.m_iTsbPdDelay_ms * int64_t(1000));
}

void CUDTGroup::close()
Expand Down Expand Up @@ -2089,6 +2024,22 @@ void CUDTGroup::updateReadState(SRTSOCKET /* not sure if needed */, int32_t sequ
}
}

void CUDTGroup::updateRcvCurrSeqNo(int32_t seq)
{
ScopedLock lg(m_GroupLock);

if (m_iRcvCurrSeqNo == SRT_SEQNO_NONE)
{
LOGC(grlog.Error,
log << "IPE: CUDTGroup::m_iRcvCurrSeqNo was not initialized by the first member, setting to %" << seq);
m_iRcvCurrSeqNo = seq;
}
else if (CSeqNo::seqcmp(seq, m_iRcvCurrSeqNo) > 0)
{
m_iRcvCurrSeqNo = seq;
}
}

int32_t CUDTGroup::getRcvBaseSeqNo()
{
ScopedLock lg(m_GroupLock);
Expand Down
12 changes: 4 additions & 8 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ class CUDTGroup
// that was disconnected other than immediately closing it.
if (m_Group.empty())
{
// When the group is empty, there's no danger that this
// number will collide with any ISN provided by a socket.
// Also since now every socket will derive this ISN.
m_iLastSchedSeqNo = generateISN();
resetInitialRxSequence();
empty = true;
}
}
Expand Down Expand Up @@ -346,6 +341,7 @@ class CUDTGroup
/// @param sock member socket ID (unused)
/// @param sequence the latest packet sequence number available for reading.
void updateReadState(SRTSOCKET sock, int32_t sequence);
void updateRcvCurrSeqNo(int32_t seq);

void updateWriteState();
void updateFailedLink();
Expand Down Expand Up @@ -630,7 +626,6 @@ class CUDTGroup
bool m_bSynSending;
bool m_bTsbPd;
bool m_bTLPktDrop;
int64_t m_iTsbPdDelay_us;
int m_RcvEID;
class CEPollDesc* m_RcvEpolld;
int m_SndEID;
Expand Down Expand Up @@ -692,6 +687,7 @@ class CUDTGroup
sync::Mutex m_RcvDataLock;
sync::atomic<int32_t> m_iLastSchedSeqNo; // represetnts the value of CUDT::m_iSndNextSeqNo for each running socket
sync::atomic<int32_t> m_iLastSchedMsgNo;
sync::atomic<int32_t> m_iRcvCurrSeqNo; // Represetnts the max value of CUDT::m_iRcvCurrSeqNo for all sockets in this group.
// Statistics

struct Stats
Expand Down Expand Up @@ -803,7 +799,7 @@ class CUDTGroup

// Live state synchronization
bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr);
bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);
void applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);

/// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference.
/// @param srcMember a reference for synchronization.
Expand All @@ -816,8 +812,8 @@ class CUDTGroup
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, peerid, m_PeerGroupID);
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type);
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo);
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentRecvSequence, m_iRcvCurrSeqNo);
SRTU_PROPERTY_RRW(std::set<int>&, epollset, m_sPollID);
SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us);
SRTU_PROPERTY_RO(bool, closing, m_bClosing);
};

Expand Down

0 comments on commit 774212f

Please sign in to comment.