diff --git a/srtcore/api.cpp b/srtcore/api.cpp index bb5dd64fe..d121fca1e 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -191,7 +191,6 @@ srt::CUDTUnited::CUDTUnited() // be a problem in general. setupMutex(m_GCStopLock, "GCStop"); setupCond(m_GCStopCond, "GCStop"); - setupMutex(m_GlobControlLock, "GlobControl"); setupMutex(m_IDLock, "ID"); setupMutex(m_InitLock, "Init"); } @@ -206,7 +205,6 @@ srt::CUDTUnited::~CUDTUnited() cleanup(); } - releaseMutex(m_GlobControlLock); releaseMutex(m_IDLock); releaseMutex(m_InitLock); // XXX There's some weird bug here causing this @@ -352,44 +350,31 @@ SRTSOCKET srt::CUDTUnited::generateSocketID(bool for_group) int startval = sockval; for (;;) // Roll until an unused value is found { - enterCS(m_GlobControlLock); - const bool exists = -#if ENABLE_BONDING - for_group - ? m_Groups.count(sockval | SRTGROUP_MASK) - : -#endif - m_Sockets.count(sockval); - leaveCS(m_GlobControlLock); - - if (exists) + if (!checkSocketExists(sockval, for_group)) { - // The socket value is in use. - --sockval; - if (sockval <= 0) - sockval = MAX_SOCKET_VAL; - - // Before continuing, check if we haven't rolled back to start again - // This is virtually impossible, so just make an RTI error. - if (sockval == startval) - { - // Of course, we don't lack memory, but actually this is so impossible - // that a complete memory extinction is much more possible than this. - // So treat this rather as a formal fallback for something that "should - // never happen". This should make the socket creation functions, from - // socket_create and accept, return this error. - - m_SocketIDGenerator = sockval + 1; // so that any next call will cause the same error - throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0); - } - - // try again, if this is a free socket - continue; + // No socket found, this ID is free to use + m_SocketIDGenerator = sockval; + break; + } + + // The socket value is in use. + --sockval; + if (sockval <= 0) + sockval = MAX_SOCKET_VAL; + + // Before continuing, check if we haven't rolled back to start again + // This is virtually impossible, so just make an RTI error. + if (sockval == startval) + { + // Of course, we don't lack memory, but actually this is so impossible + // that a complete memory extinction is much more possible than this. + // So treat this rather as a formal fallback for something that "should + // never happen". This should make the socket creation functions, from + // socket_create and accept, return this error. + + m_SocketIDGenerator = sockval + 1; // so that any next call will cause the same error + throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0); } - - // No socket found, this ID is free to use - m_SocketIDGenerator = sockval; - break; } } else @@ -447,7 +432,7 @@ SRTSOCKET srt::CUDTUnited::newSocket(CUDTSocket** pps) HLOGC(smlog.Debug, log << CONID(ns->m_SocketID) << "newSocket: mapping socket " << ns->m_SocketID); // protect the m_Sockets structure. - ScopedLock cs(m_GlobControlLock); + ExclusiveLock cs(m_GlobControlLock); m_Sockets[ns->m_SocketID] = ns; } catch (...) @@ -599,7 +584,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, HLOGC(cnlog.Debug, log << "newConnection: incoming " << peer.str() << ", mapping socket " << ns->m_SocketID); { - ScopedLock cg(m_GlobControlLock); + ExclusiveLock cg(m_GlobControlLock); m_Sockets[ns->m_SocketID] = ns; } @@ -648,7 +633,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, { // protect the m_PeerRec structure (and group existence) - ScopedLock glock(m_GlobControlLock); + ExclusiveLock glock(m_GlobControlLock); try { HLOGC(cnlog.Debug, log << "newConnection: mapping peer " << ns->m_PeerID @@ -822,7 +807,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, // connect() in UDT code) may fail, in which case this socket should not be // further processed and should be removed. { - ScopedLock cg(m_GlobControlLock); + ExclusiveLock cg(m_GlobControlLock); #if ENABLE_BONDING if (ns->m_GroupOf) @@ -897,7 +882,7 @@ int srt::CUDTUnited::installConnectHook(const SRTSOCKET u, srt_connect_callback_ SRT_SOCKSTATUS srt::CUDTUnited::getStatus(const SRTSOCKET u) { // protects the m_Sockets structure - ScopedLock cg(m_GlobControlLock); + SharedLock cg(m_GlobControlLock); sockets_t::const_iterator i = m_Sockets.find(u); @@ -1161,6 +1146,7 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int } // Set properly the SRTO_GROUPCONNECT flag + // TODO: Socket config must be protected by m_ConnectionLock? s->core().m_config.iGroupConnect = 0; // Check if LISTENER has the SRTO_GROUPCONNECT flag set, @@ -1171,7 +1157,7 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int { // Put a lock to protect the group against accidental deletion // in the meantime. - ScopedLock glock(m_GlobControlLock); + SharedLock glock(m_GlobControlLock); // Check again; it's unlikely to happen, but // it's a theoretically possible scenario if (s->m_GroupOf) @@ -1451,7 +1437,7 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i } { - ScopedLock cs(m_GlobControlLock); + ExclusiveLock cs(m_GlobControlLock); if (m_Sockets.count(sid) == 0) { HLOGC(aclog.Debug, log << "srt_connect_group: socket @" << sid << " deleted in process"); @@ -1557,7 +1543,7 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i targets[tii].errorcode = e.getErrorCode(); targets[tii].id = CUDT::INVALID_SOCK; - ScopedLock cl(m_GlobControlLock); + ExclusiveLock cl(m_GlobControlLock); ns->removeFromGroup(false); m_Sockets.erase(ns->m_SocketID); // Intercept to delete the socket on failure. @@ -1569,7 +1555,7 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i LOGC(aclog.Fatal, log << "groupConnect: IPE: UNKNOWN EXCEPTION from connectIn"); targets[tii].errorcode = SRT_ESYSOBJ; targets[tii].id = CUDT::INVALID_SOCK; - ScopedLock cl(m_GlobControlLock); + ExclusiveLock cl(m_GlobControlLock); ns->removeFromGroup(false); m_Sockets.erase(ns->m_SocketID); // Intercept to delete the socket on failure. @@ -1943,7 +1929,7 @@ void srt::CUDTUnited::deleteGroup(CUDTGroup* g) { using srt_logging::gmlog; - srt::sync::ScopedLock cg(m_GlobControlLock); + SharedLock cg(m_GlobControlLock); return deleteGroup_LOCKED(g); } @@ -2034,7 +2020,7 @@ int srt::CUDTUnited::close(CUDTSocket* s) HLOGC(smlog.Debug, log << "@" << u << "U::close done. GLOBAL CLOSE: " << s->core().CONID() << "Acquiring GLOBAL control lock"); - ScopedLock manager_cg(m_GlobControlLock); + ExclusiveLock manager_cg(m_GlobControlLock); // since "s" is located before m_GlobControlLock, locate it again in case // it became invalid // XXX This is very weird; if we state that the CUDTSocket object @@ -2105,7 +2091,7 @@ int srt::CUDTUnited::close(CUDTSocket* s) // Done the other way, but still done. You can stop waiting. bool isgone = false; { - ScopedLock manager_cg(m_GlobControlLock); + SharedLock manager_cg(m_GlobControlLock); isgone = m_ClosedSockets.count(u) == 0; } if (!isgone) @@ -2505,9 +2491,22 @@ int srt::CUDTUnited::epoll_release(const int eid) return m_EPoll.release(eid); } +bool srt::CUDTUnited::checkSocketExists(const SRTSOCKET u, bool isGroup) const +{ + SharedLock cg(m_GlobControlLock); + const bool exists = +#if ENABLE_BONDING + isGroup + ? m_Groups.count(u | SRTGROUP_MASK) + : +#endif + m_Sockets.count(u); + return exists; +} + srt::CUDTSocket* srt::CUDTUnited::locateSocket(const SRTSOCKET u, ErrorHandling erh) { - ScopedLock cg(m_GlobControlLock); + SharedLock cg(m_GlobControlLock); CUDTSocket* s = locateSocket_LOCKED(u); if (!s) { @@ -2535,7 +2534,7 @@ srt::CUDTSocket* srt::CUDTUnited::locateSocket_LOCKED(SRTSOCKET u) #if ENABLE_BONDING srt::CUDTGroup* srt::CUDTUnited::locateAcquireGroup(SRTSOCKET u, ErrorHandling erh) { - ScopedLock cg(m_GlobControlLock); + SharedLock cg(m_GlobControlLock); const groups_t::iterator i = m_Groups.find(u); if (i == m_Groups.end()) @@ -2552,7 +2551,7 @@ srt::CUDTGroup* srt::CUDTUnited::locateAcquireGroup(SRTSOCKET u, ErrorHandling e srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s) { - ScopedLock cg(m_GlobControlLock); + SharedLock cg(m_GlobControlLock); CUDTGroup* g = s->m_GroupOf; if (!g) return NULL; @@ -2566,7 +2565,7 @@ srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s) srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling erh) { - ScopedLock cg(m_GlobControlLock); + SharedLock cg(m_GlobControlLock); CUDTSocket* s = locateSocket_LOCKED(u); if (!s) @@ -2589,7 +2588,7 @@ bool srt::CUDTUnited::acquireSocket(CUDTSocket* s) // directly from m_Sockets, or even better, has been acquired // by some other functionality already, which is only about to // be released earlier than you need. - ScopedLock cg(m_GlobControlLock); + SharedLock cg(m_GlobControlLock); s->apiAcquire(); // Keep the lock so that no one changes anything in the meantime. // If the socket m_Status == SRTS_CLOSED (set by setClosed()), then @@ -2605,7 +2604,7 @@ bool srt::CUDTUnited::acquireSocket(CUDTSocket* s) srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn) { - ScopedLock cg(m_GlobControlLock); + SharedLock cg(m_GlobControlLock); map >::iterator i = m_PeerRec.find(CUDTSocket::getPeerSpec(id, isn)); if (i == m_PeerRec.end()) @@ -2629,7 +2628,7 @@ srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRT void srt::CUDTUnited::checkBrokenSockets() { - ScopedLock cg(m_GlobControlLock); + ExclusiveLock cg(m_GlobControlLock); #if ENABLE_BONDING vector delgids; @@ -2875,20 +2874,18 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) // delete this one m_ClosedSockets.erase(i); - // XXX This below section can unlock m_GlobControlLock - // just for calling CUDT::closeInternal(), which is needed - // to avoid locking m_ConnectionLock after m_GlobControlLock, - // while m_ConnectionLock orders BEFORE m_GlobControlLock. - // This should be perfectly safe thing to do after the socket - // ID has been erased from m_ClosedSockets. No container access - // is done in this case. - // - // Report: P04-1.28, P04-2.27, P04-2.50, P04-2.55 + // XXX This below section can unlock m_GlobControlLock + // just for calling CUDT::closeInternal(), which is needed + // to avoid locking m_ConnectionLock after m_GlobControlLock, + // while m_ConnectionLock orders BEFORE m_GlobControlLock. + // This should be perfectly safe thing to do after the socket + // ID has been erased from m_ClosedSockets. No container access + // is done in this case. + // + // Report: P04-1.28, P04-2.27, P04-2.50, P04-2.55 HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u); - leaveCS(m_GlobControlLock); s->core().closeInternal(); - enterCS(m_GlobControlLock); HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u); delete s; HLOGC(smlog.Debug, log << "GC/removeSocket: socket @" << u << " DELETED. Checking muxer."); @@ -2983,7 +2980,7 @@ bool srt::CUDTUnited::channelSettingsMatch(const CSrtMuxerConfig& cfgMuxer, cons void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, const UDPSOCKET* udpsock /*[[nullable]]*/) { - ScopedLock cg(m_GlobControlLock); + ExclusiveLock cg(m_GlobControlLock); // If udpsock is provided, then this socket will be simply // taken for binding as a good deal. It would be nice to make @@ -3290,7 +3287,8 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, cons // multiplexer wasn't found by id, the search by port number continues. bool srt::CUDTUnited::updateListenerMux(CUDTSocket* s, const CUDTSocket* ls) { - ScopedLock cg(m_GlobControlLock); + // TODO: Must be SharedLock, but multiplexer is not thread-safe otherwise. + ExclusiveLock cg(m_GlobControlLock); const int port = ls->m_SelfAddr.hport(); HLOGC(smlog.Debug, @@ -3396,7 +3394,7 @@ void* srt::CUDTUnited::garbageCollect(void* p) HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all pending sockets. Acquring control lock..."); { - ScopedLock glock(self->m_GlobControlLock); + ExclusiveLock glock(self->m_GlobControlLock); for (sockets_t::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++i) { @@ -3440,9 +3438,9 @@ void* srt::CUDTUnited::garbageCollect(void* p) { self->checkBrokenSockets(); - enterCS(self->m_GlobControlLock); + self->m_GlobControlLock.lock_shared(); bool empty = self->m_ClosedSockets.empty(); - leaveCS(self->m_GlobControlLock); + self->m_GlobControlLock.unlock_shared(); if (empty) break; @@ -3523,7 +3521,7 @@ SRTSOCKET srt::CUDT::createGroup(SRT_GROUP_TYPE gt) try { - srt::sync::ScopedLock globlock(uglobal().m_GlobControlLock); + ExclusiveLock globlock(uglobal().m_GlobControlLock); return newGroup(gt).id(); // Note: potentially, after this function exits, the group // could be deleted, immediately, from a separate thread (tho @@ -3579,7 +3577,7 @@ SRTSOCKET srt::CUDT::getGroupOfSocket(SRTSOCKET socket) { // Lock this for the whole function as we need the group // to persist the call. - ScopedLock glock(uglobal().m_GlobControlLock); + SharedLock glock(uglobal().m_GlobControlLock); CUDTSocket* s = uglobal().locateSocket_LOCKED(socket); if (!s || !s->m_GroupOf) return APIError(MJ_NOTSUP, MN_INVAL, 0); diff --git a/srtcore/api.h b/srtcore/api.h index b5d6be915..f39756dbc 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -407,7 +407,12 @@ class CUDTUnited groups_t m_Groups; #endif - sync::Mutex m_GlobControlLock; // used to synchronize UDT API + /// Guards all member containers of `CUDTUnited`. Protect UDT API from data races. + /// Non-exclusive lock prohibits changes of containers (insert/remove), but allows modifications + /// of the contained objects (sockets, groups). + /// Exclusive lock is required for changes of the containers (insert/remove). + /// NB! Changes to the elements of the m_mMultiplexer must be protected exclusively. + mutable sync::SharedMutex m_GlobControlLock; sync::Mutex m_IDLock; // used to synchronize ID generation @@ -421,6 +426,8 @@ class CUDTUnited private: friend struct FLookupSocketWithEvent_LOCKED; + bool checkSocketExists(const SRTSOCKET u, bool isGroup) const; + CUDTSocket* locateSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN); // This function does the same as locateSocket, except that: // - lock on m_GlobControlLock is expected (so that you don't unlock between finding and using) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index eca2b2069..60badfe74 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -1734,7 +1734,7 @@ bool srt::CUDT::createSrtHandshake( if (have_group) { // NOTE: See information about mutex ordering in api.h - ScopedLock gdrg (uglobal().m_GlobControlLock); + SharedLock gdrg (uglobal().m_GlobControlLock); if (!m_parent->m_GroupOf) { // This may only happen if since last check of m_GroupOf pointer the socket was removed @@ -3160,7 +3160,8 @@ bool srt::CUDT::interpretGroup(const int32_t groupdata[], size_t data_size SRT_A return false; } - ScopedLock guard_group_existence (uglobal().m_GlobControlLock); + // makeMePeerOf may add a new group to m_Groups. + ExclusiveLock guard_group_existence (uglobal().m_GlobControlLock); if (m_SrtHsSide == HSD_INITIATOR) { @@ -4815,7 +4816,7 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous, { #if ENABLE_BONDING - ScopedLock cl (uglobal().m_GlobControlLock); + SharedLock cl (uglobal().m_GlobControlLock); CUDTGroup* g = m_parent->m_GroupOf; if (g) { @@ -4922,10 +4923,12 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous, //int token = -1; #if ENABLE_BONDING { - ScopedLock cl (uglobal().m_GlobControlLock); + SharedLock cl (uglobal().m_GlobControlLock); CUDTGroup* g = m_parent->m_GroupOf; if (g) { + // TODO: Lock this group `g`?? + // XXX this might require another check of group type. // For redundancy group, at least, update the status in the group. @@ -8168,7 +8171,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) // can be either never set, already reset, or ever set // and possibly dangling. The re-check after lock eliminates // the dangling case. - ScopedLock glock (uglobal().m_GlobControlLock); + SharedLock glock (uglobal().m_GlobControlLock); // Note that updateLatestRcv will lock m_GroupOf->m_GroupLock, // but this is an intended order. @@ -8229,7 +8232,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) if (group_read_seq != SRT_SEQNO_NONE && m_parent->m_GroupOf) { // See above explanation for double-checking - ScopedLock glock (uglobal().m_GlobControlLock); + SharedLock glock (uglobal().m_GlobControlLock); if (m_parent->m_GroupOf) { @@ -8385,7 +8388,7 @@ void srt::CUDT::updateSndLossListOnACK(int32_t ackdata_seqno) { // m_RecvAckLock is ordered AFTER m_GlobControlLock, so this can only // be done now that m_RecvAckLock is unlocked. - ScopedLock glock (uglobal().m_GlobControlLock); + SharedLock glock (uglobal().m_GlobControlLock); if (m_parent->m_GroupOf) { HLOGC(inlog.Debug, log << CONID() << "ACK: acking group sender buffer for #" << msgno_at_last_acked_seq); @@ -8540,7 +8543,7 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_ #if ENABLE_BONDING if (m_parent->m_GroupOf) { - ScopedLock glock (uglobal().m_GlobControlLock); + SharedLock glock (uglobal().m_GlobControlLock); if (m_parent->m_GroupOf) { // Will apply m_GroupLock, ordered after m_GlobControlLock. @@ -8761,7 +8764,7 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr if (m_config.bDriftTracer) { #if ENABLE_BONDING - ScopedLock glock(uglobal().m_GlobControlLock); // XXX not too excessive? + SharedLock glock(uglobal().m_GlobControlLock); // XXX not too excessive? const bool drift_updated = #endif m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); @@ -10458,7 +10461,7 @@ int srt::CUDT::processData(CUnit* in_unit) // reception sequence pointer stating that this link is not receiving. if (m_parent->m_GroupOf) { - ScopedLock protect_group_existence (uglobal().m_GlobControlLock); + SharedLock protect_group_existence (uglobal().m_GlobControlLock); groups::SocketData* gi = m_parent->m_GroupMemberData; // This check is needed as after getting the lock the socket @@ -11666,7 +11669,7 @@ void srt::CUDT::checkTimers() #if ENABLE_BONDING if (m_parent->m_GroupOf) { - ScopedLock glock (uglobal().m_GlobControlLock); + SharedLock glock (uglobal().m_GlobControlLock); if (m_parent->m_GroupOf) { // Pass socket ID because it's about changing group socket data @@ -11697,7 +11700,7 @@ void srt::CUDT::completeBrokenConnectionDependencies(int errorcode) #if ENABLE_BONDING bool pending_broken = false; { - ScopedLock guard_group_existence (uglobal().m_GlobControlLock); + SharedLock guard_group_existence (uglobal().m_GlobControlLock); if (m_parent->m_GroupOf) { token = m_parent->m_GroupMemberData->token; @@ -11732,7 +11735,7 @@ void srt::CUDT::completeBrokenConnectionDependencies(int errorcode) // existence of the group will not be changed during // the operation. The attempt of group deletion will // have to wait until this operation completes. - ScopedLock lock(uglobal().m_GlobControlLock); + SharedLock lock(uglobal().m_GlobControlLock); CUDTGroup* pg = m_parent->m_GroupOf; if (pg) { @@ -11990,7 +11993,7 @@ void srt::CUDT::processKeepalive(const CPacket& ctrlpkt, const time_point& tsArr // existence of the group will not be changed during // the operation. The attempt of group deletion will // have to wait until this operation completes. - ScopedLock lock(uglobal().m_GlobControlLock); + SharedLock lock(uglobal().m_GlobControlLock); CUDTGroup* pg = m_parent->m_GroupOf; if (pg) { diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 5601cdeee..02494885a 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -959,7 +959,7 @@ void CUDTGroup::close() vector ids; { - ScopedLock glob(CUDT::uglobal().m_GlobControlLock); + SharedLock glob(CUDT::uglobal().m_GlobControlLock); ScopedLock g(m_GroupLock); m_bClosing = true; @@ -1144,12 +1144,12 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) vector activeLinks; // First, acquire GlobControlLock to make sure all member sockets still exist - enterCS(m_Global.m_GlobControlLock); + m_Global.m_GlobControlLock.lock_shared(); ScopedLock guard(m_GroupLock); if (m_bClosing) { - leaveCS(m_Global.m_GlobControlLock); + m_Global.m_GlobControlLock.unlock_shared(); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } @@ -1157,7 +1157,7 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) // LOCKED: GlobControlLock, GroupLock (RIGHT ORDER!) send_CheckValidSockets(); - leaveCS(m_Global.m_GlobControlLock); + m_Global.m_GlobControlLock.unlock_shared(); // LOCKED: GroupLock (only) // Since this moment GlobControlLock may only be locked if GroupLock is unlocked first. @@ -1487,7 +1487,7 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) { { InvertedLock ung (m_GroupLock); - enterCS(CUDT::uglobal().m_GlobControlLock); + CUDT::uglobal().m_GlobControlLock.lock_shared(); HLOGC(gslog.Debug, log << "grp/sendBroadcast: Locked GlobControlLock, locking back GroupLock"); } @@ -1539,7 +1539,7 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) } // Now you can leave GlobControlLock, while GroupLock is still locked. - leaveCS(CUDT::uglobal().m_GlobControlLock); + CUDT::uglobal().m_GlobControlLock.unlock_shared(); } // Re-check after the waiting lock has been reacquired @@ -2073,7 +2073,7 @@ vector CUDTGroup::recv_WaitForReadReady(const vector& THREAD_RESUMED(); // HERE GlobControlLock is locked first, then GroupLock is applied back - enterCS(CUDT::uglobal().m_GlobControlLock); + CUDT::uglobal().m_GlobControlLock.lock_shared(); } // BOTH m_GlobControlLock AND m_GroupLock are locked here. @@ -2083,7 +2083,7 @@ vector CUDTGroup::recv_WaitForReadReady(const vector& { // GlobControlLock is applied manually, so unlock manually. // GroupLock will be unlocked as per scope. - leaveCS(CUDT::uglobal().m_GlobControlLock); + CUDT::uglobal().m_GlobControlLock.unlock_shared(); // This can only happen when 0 is passed as timeout and none is ready. // And 0 is passed only in non-blocking mode. So this is none ready in // non-blocking mode. @@ -2141,7 +2141,7 @@ vector CUDTGroup::recv_WaitForReadReady(const vector& } } - leaveCS(CUDT::uglobal().m_GlobControlLock); + CUDT::uglobal().m_GlobControlLock.lock_shared(); return readReady; } @@ -2248,7 +2248,7 @@ static bool isValidSeqno(int32_t iBaseSeqno, int32_t iPktSeqno) int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) { // First, acquire GlobControlLock to make sure all member sockets still exist - enterCS(m_Global.m_GlobControlLock); + CUDT::uglobal().m_GlobControlLock.lock_shared(); ScopedLock guard(m_GroupLock); if (m_bClosing) @@ -2258,13 +2258,13 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) // must fist wait for being able to acquire this lock. // The group will not be deleted now because it is added usage counter // by this call, but will be released once it exits. - leaveCS(m_Global.m_GlobControlLock); + CUDT::uglobal().m_GlobControlLock.unlock_shared(); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } // Now, still under lock, check if all sockets still can be dispatched send_CheckValidSockets(); - leaveCS(m_Global.m_GlobControlLock); + CUDT::uglobal().m_GlobControlLock.unlock_shared(); if (m_bClosing) throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); @@ -3274,7 +3274,7 @@ void CUDTGroup::send_CloseBrokenSockets(vector& w_wipeme) // With unlocked GroupLock, we can now lock GlobControlLock. // This is needed to prevent any of them deleted from the container // at the same time. - ScopedLock globlock(CUDT::uglobal().m_GlobControlLock); + SharedLock globlock(CUDT::uglobal().m_GlobControlLock); for (vector::iterator p = w_wipeme.begin(); p != w_wipeme.end(); ++p) { @@ -3311,7 +3311,7 @@ void CUDTGroup::sendBackup_CloseBrokenSockets(SendBackupCtx& w_sendBackupCtx) // With unlocked GroupLock, we can now lock GlobControlLock. // This is needed prevent any of them be deleted from the container // at the same time. - ScopedLock globlock(CUDT::uglobal().m_GlobControlLock); + SharedLock globlock(CUDT::uglobal().m_GlobControlLock); typedef vector::const_iterator const_iter_t; for (const_iter_t member = w_sendBackupCtx.memberStates().begin(); member != w_sendBackupCtx.memberStates().end(); ++member) @@ -3641,18 +3641,18 @@ int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc) // [[using assert(this->m_pSndBuffer != nullptr)]]; // First, acquire GlobControlLock to make sure all member sockets still exist - enterCS(m_Global.m_GlobControlLock); + m_Global.m_GlobControlLock.lock_shared(); ScopedLock guard(m_GroupLock); if (m_bClosing) { - leaveCS(m_Global.m_GlobControlLock); + m_Global.m_GlobControlLock.unlock_shared(); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } // Now, still under lock, check if all sockets still can be dispatched send_CheckValidSockets(); - leaveCS(m_Global.m_GlobControlLock); + m_Global.m_GlobControlLock.unlock_shared(); steady_clock::time_point currtime = steady_clock::now();