Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Converted GlobControlLock to a read-write lock. #3014

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 73 additions & 75 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ srt::CUDTUnited::CUDTUnited()
// be a problem in general.
setupMutex(m_GCStopLock, "GCStop");
setupCond(m_GCStopCond, "GCStop");
setupMutex(m_GlobControlLock, "GlobControl");
setupMutex(m_IDLock, "ID");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to decide something about these calls. With the current locking implementation these are not needed and have been left here to allow implementation of the mutex tracker, which is still hanging around in one of the PRs. So, either add an appropriate empty function with that name to cover the stub, or we'll need to remove them all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a problem with an initialization of a conditional variable. There is a difference in POSIX on how you initialize the one in a global / static scope, and the one in runtime. That's what setupCond do on POSIX builds. And there was a crash if that initialization was done in the constructor.
setupMutex is indeed not needed except for the mutex tracker.

void Condition::init()
{
    pthread_condattr_t* attr = NULL;
#if SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_GETTIME_MONOTONIC
    pthread_condattr_t  CondAttribs;
    pthread_condattr_init(&CondAttribs);
    pthread_condattr_setclock(&CondAttribs, CLOCK_MONOTONIC);
    attr = &CondAttribs;
#endif
    const int res = pthread_cond_init(&m_cv, attr);
    if (res != 0)
        throw std::runtime_error("pthread_cond_init monotonic failed");
}

setupMutex(m_InitLock, "Init");
}
Expand All @@ -206,7 +205,6 @@ srt::CUDTUnited::~CUDTUnited()
cleanup();
}

releaseMutex(m_GlobControlLock);
releaseMutex(m_IDLock);
releaseMutex(m_InitLock);
// XXX There's some weird bug here causing this
Expand Down Expand Up @@ -352,44 +350,31 @@ SRTSOCKET srt::CUDTUnited::generateSocketID(bool for_group)
int startval = sockval;
for (;;) // Roll until an unused value is found
{
enterCS(m_GlobControlLock);
const bool exists =
#if ENABLE_BONDING
for_group
? m_Groups.count(sockval | SRTGROUP_MASK)
:
#endif
m_Sockets.count(sockval);
leaveCS(m_GlobControlLock);

if (exists)
if (!checkSocketExists(sockval, for_group))
{
// The socket value is in use.
--sockval;
if (sockval <= 0)
sockval = MAX_SOCKET_VAL;

// Before continuing, check if we haven't rolled back to start again
// This is virtually impossible, so just make an RTI error.
if (sockval == startval)
{
// Of course, we don't lack memory, but actually this is so impossible
// that a complete memory extinction is much more possible than this.
// So treat this rather as a formal fallback for something that "should
// never happen". This should make the socket creation functions, from
// socket_create and accept, return this error.

m_SocketIDGenerator = sockval + 1; // so that any next call will cause the same error
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}

// try again, if this is a free socket
continue;
// No socket found, this ID is free to use
m_SocketIDGenerator = sockval;
break;
}

// The socket value is in use.
--sockval;
if (sockval <= 0)
sockval = MAX_SOCKET_VAL;

// Before continuing, check if we haven't rolled back to start again
// This is virtually impossible, so just make an RTI error.
if (sockval == startval)
{
// Of course, we don't lack memory, but actually this is so impossible
// that a complete memory extinction is much more possible than this.
// So treat this rather as a formal fallback for something that "should
// never happen". This should make the socket creation functions, from
// socket_create and accept, return this error.

m_SocketIDGenerator = sockval + 1; // so that any next call will cause the same error
throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
}

// No socket found, this ID is free to use
m_SocketIDGenerator = sockval;
break;
}
}
else
Expand Down Expand Up @@ -447,7 +432,7 @@ SRTSOCKET srt::CUDTUnited::newSocket(CUDTSocket** pps)
HLOGC(smlog.Debug, log << CONID(ns->m_SocketID) << "newSocket: mapping socket " << ns->m_SocketID);

// protect the m_Sockets structure.
ScopedLock cs(m_GlobControlLock);
ExclusiveLock cs(m_GlobControlLock);
m_Sockets[ns->m_SocketID] = ns;
}
catch (...)
Expand Down Expand Up @@ -599,7 +584,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
HLOGC(cnlog.Debug, log <<
"newConnection: incoming " << peer.str() << ", mapping socket " << ns->m_SocketID);
{
ScopedLock cg(m_GlobControlLock);
ExclusiveLock cg(m_GlobControlLock);
m_Sockets[ns->m_SocketID] = ns;
}

Expand Down Expand Up @@ -648,7 +633,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,

{
// protect the m_PeerRec structure (and group existence)
ScopedLock glock(m_GlobControlLock);
ExclusiveLock glock(m_GlobControlLock);
try
{
HLOGC(cnlog.Debug, log << "newConnection: mapping peer " << ns->m_PeerID
Expand Down Expand Up @@ -822,7 +807,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
// connect() in UDT code) may fail, in which case this socket should not be
// further processed and should be removed.
{
ScopedLock cg(m_GlobControlLock);
ExclusiveLock cg(m_GlobControlLock);

#if ENABLE_BONDING
if (ns->m_GroupOf)
Expand Down Expand Up @@ -897,7 +882,7 @@ int srt::CUDTUnited::installConnectHook(const SRTSOCKET u, srt_connect_callback_
SRT_SOCKSTATUS srt::CUDTUnited::getStatus(const SRTSOCKET u)
{
// protects the m_Sockets structure
ScopedLock cg(m_GlobControlLock);
SharedLock cg(m_GlobControlLock);

sockets_t::const_iterator i = m_Sockets.find(u);

Expand Down Expand Up @@ -1161,6 +1146,7 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int
}

// Set properly the SRTO_GROUPCONNECT flag
// TODO: Socket config must be protected by m_ConnectionLock?
s->core().m_config.iGroupConnect = 0;

// Check if LISTENER has the SRTO_GROUPCONNECT flag set,
Expand All @@ -1171,7 +1157,7 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int
{
// Put a lock to protect the group against accidental deletion
// in the meantime.
ScopedLock glock(m_GlobControlLock);
SharedLock glock(m_GlobControlLock);
// Check again; it's unlikely to happen, but
// it's a theoretically possible scenario
if (s->m_GroupOf)
Expand Down Expand Up @@ -1451,7 +1437,7 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i
}

{
ScopedLock cs(m_GlobControlLock);
ExclusiveLock cs(m_GlobControlLock);
if (m_Sockets.count(sid) == 0)
{
HLOGC(aclog.Debug, log << "srt_connect_group: socket @" << sid << " deleted in process");
Expand Down Expand Up @@ -1557,7 +1543,7 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i
targets[tii].errorcode = e.getErrorCode();
targets[tii].id = CUDT::INVALID_SOCK;

ScopedLock cl(m_GlobControlLock);
ExclusiveLock cl(m_GlobControlLock);
ns->removeFromGroup(false);
m_Sockets.erase(ns->m_SocketID);
// Intercept to delete the socket on failure.
Expand All @@ -1569,7 +1555,7 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i
LOGC(aclog.Fatal, log << "groupConnect: IPE: UNKNOWN EXCEPTION from connectIn");
targets[tii].errorcode = SRT_ESYSOBJ;
targets[tii].id = CUDT::INVALID_SOCK;
ScopedLock cl(m_GlobControlLock);
ExclusiveLock cl(m_GlobControlLock);
ns->removeFromGroup(false);
m_Sockets.erase(ns->m_SocketID);
// Intercept to delete the socket on failure.
Expand Down Expand Up @@ -1943,7 +1929,7 @@ void srt::CUDTUnited::deleteGroup(CUDTGroup* g)
{
using srt_logging::gmlog;

srt::sync::ScopedLock cg(m_GlobControlLock);
SharedLock cg(m_GlobControlLock);
return deleteGroup_LOCKED(g);
}

Expand Down Expand Up @@ -2034,7 +2020,7 @@ int srt::CUDTUnited::close(CUDTSocket* s)
HLOGC(smlog.Debug,
log << "@" << u << "U::close done. GLOBAL CLOSE: " << s->core().CONID()
<< "Acquiring GLOBAL control lock");
ScopedLock manager_cg(m_GlobControlLock);
ExclusiveLock manager_cg(m_GlobControlLock);
// since "s" is located before m_GlobControlLock, locate it again in case
// it became invalid
// XXX This is very weird; if we state that the CUDTSocket object
Expand Down Expand Up @@ -2105,7 +2091,7 @@ int srt::CUDTUnited::close(CUDTSocket* s)
// Done the other way, but still done. You can stop waiting.
bool isgone = false;
{
ScopedLock manager_cg(m_GlobControlLock);
SharedLock manager_cg(m_GlobControlLock);
isgone = m_ClosedSockets.count(u) == 0;
}
if (!isgone)
Expand Down Expand Up @@ -2505,9 +2491,22 @@ int srt::CUDTUnited::epoll_release(const int eid)
return m_EPoll.release(eid);
}

bool srt::CUDTUnited::checkSocketExists(const SRTSOCKET u, bool isGroup) const
{
SharedLock cg(m_GlobControlLock);
const bool exists =
#if ENABLE_BONDING
isGroup
? m_Groups.count(u | SRTGROUP_MASK)
:
#endif
m_Sockets.count(u);
return exists;
}

srt::CUDTSocket* srt::CUDTUnited::locateSocket(const SRTSOCKET u, ErrorHandling erh)
{
ScopedLock cg(m_GlobControlLock);
SharedLock cg(m_GlobControlLock);
CUDTSocket* s = locateSocket_LOCKED(u);
if (!s)
{
Expand Down Expand Up @@ -2535,7 +2534,7 @@ srt::CUDTSocket* srt::CUDTUnited::locateSocket_LOCKED(SRTSOCKET u)
#if ENABLE_BONDING
srt::CUDTGroup* srt::CUDTUnited::locateAcquireGroup(SRTSOCKET u, ErrorHandling erh)
{
ScopedLock cg(m_GlobControlLock);
SharedLock cg(m_GlobControlLock);

const groups_t::iterator i = m_Groups.find(u);
if (i == m_Groups.end())
Expand All @@ -2552,7 +2551,7 @@ srt::CUDTGroup* srt::CUDTUnited::locateAcquireGroup(SRTSOCKET u, ErrorHandling e

srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s)
{
ScopedLock cg(m_GlobControlLock);
SharedLock cg(m_GlobControlLock);
CUDTGroup* g = s->m_GroupOf;
if (!g)
return NULL;
Expand All @@ -2566,7 +2565,7 @@ srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s)

srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling erh)
{
ScopedLock cg(m_GlobControlLock);
SharedLock cg(m_GlobControlLock);

CUDTSocket* s = locateSocket_LOCKED(u);
if (!s)
Expand All @@ -2589,7 +2588,7 @@ bool srt::CUDTUnited::acquireSocket(CUDTSocket* s)
// directly from m_Sockets, or even better, has been acquired
// by some other functionality already, which is only about to
// be released earlier than you need.
ScopedLock cg(m_GlobControlLock);
SharedLock cg(m_GlobControlLock);
s->apiAcquire();
// Keep the lock so that no one changes anything in the meantime.
// If the socket m_Status == SRTS_CLOSED (set by setClosed()), then
Expand All @@ -2605,7 +2604,7 @@ bool srt::CUDTUnited::acquireSocket(CUDTSocket* s)

srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn)
{
ScopedLock cg(m_GlobControlLock);
SharedLock cg(m_GlobControlLock);

map<int64_t, set<SRTSOCKET> >::iterator i = m_PeerRec.find(CUDTSocket::getPeerSpec(id, isn));
if (i == m_PeerRec.end())
Expand All @@ -2629,7 +2628,7 @@ srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRT

void srt::CUDTUnited::checkBrokenSockets()
{
ScopedLock cg(m_GlobControlLock);
ExclusiveLock cg(m_GlobControlLock);

#if ENABLE_BONDING
vector<SRTSOCKET> delgids;
Expand Down Expand Up @@ -2875,20 +2874,18 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
// delete this one
m_ClosedSockets.erase(i);

// XXX This below section can unlock m_GlobControlLock
// just for calling CUDT::closeInternal(), which is needed
// to avoid locking m_ConnectionLock after m_GlobControlLock,
// while m_ConnectionLock orders BEFORE m_GlobControlLock.
// This should be perfectly safe thing to do after the socket
// ID has been erased from m_ClosedSockets. No container access
// is done in this case.
//
// Report: P04-1.28, P04-2.27, P04-2.50, P04-2.55
// XXX This below section can unlock m_GlobControlLock
// just for calling CUDT::closeInternal(), which is needed
// to avoid locking m_ConnectionLock after m_GlobControlLock,
// while m_ConnectionLock orders BEFORE m_GlobControlLock.
// This should be perfectly safe thing to do after the socket
// ID has been erased from m_ClosedSockets. No container access
// is done in this case.
//
// Report: P04-1.28, P04-2.27, P04-2.50, P04-2.55

HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u);
leaveCS(m_GlobControlLock);
s->core().closeInternal();
enterCS(m_GlobControlLock);
HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was here to fix the problem of locking m_ConnectionLock after m_GlobControlLock. Why is that no longer necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GlobControl "resource" is now held in a shared manner protecting containers from modification, but there is no mutex being locked. Hence no need for this unlock-lock trick.

delete s;
HLOGC(smlog.Debug, log << "GC/removeSocket: socket @" << u << " DELETED. Checking muxer.");
Expand Down Expand Up @@ -2983,7 +2980,7 @@ bool srt::CUDTUnited::channelSettingsMatch(const CSrtMuxerConfig& cfgMuxer, cons

void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, const UDPSOCKET* udpsock /*[[nullable]]*/)
{
ScopedLock cg(m_GlobControlLock);
ExclusiveLock cg(m_GlobControlLock);

// If udpsock is provided, then this socket will be simply
// taken for binding as a good deal. It would be nice to make
Expand Down Expand Up @@ -3290,7 +3287,8 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, cons
// multiplexer wasn't found by id, the search by port number continues.
bool srt::CUDTUnited::updateListenerMux(CUDTSocket* s, const CUDTSocket* ls)
{
ScopedLock cg(m_GlobControlLock);
// TODO: Must be SharedLock, but multiplexer is not thread-safe otherwise.
ExclusiveLock cg(m_GlobControlLock);
const int port = ls->m_SelfAddr.hport();

HLOGC(smlog.Debug,
Expand Down Expand Up @@ -3396,7 +3394,7 @@ void* srt::CUDTUnited::garbageCollect(void* p)
HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all pending sockets. Acquring control lock...");

{
ScopedLock glock(self->m_GlobControlLock);
ExclusiveLock glock(self->m_GlobControlLock);

for (sockets_t::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++i)
{
Expand Down Expand Up @@ -3440,9 +3438,9 @@ void* srt::CUDTUnited::garbageCollect(void* p)
{
self->checkBrokenSockets();

enterCS(self->m_GlobControlLock);
self->m_GlobControlLock.lock_shared();
bool empty = self->m_ClosedSockets.empty();
leaveCS(self->m_GlobControlLock);
self->m_GlobControlLock.unlock_shared();

if (empty)
break;
Expand Down Expand Up @@ -3523,7 +3521,7 @@ SRTSOCKET srt::CUDT::createGroup(SRT_GROUP_TYPE gt)

try
{
srt::sync::ScopedLock globlock(uglobal().m_GlobControlLock);
ExclusiveLock globlock(uglobal().m_GlobControlLock);
return newGroup(gt).id();
// Note: potentially, after this function exits, the group
// could be deleted, immediately, from a separate thread (tho
Expand Down Expand Up @@ -3579,7 +3577,7 @@ SRTSOCKET srt::CUDT::getGroupOfSocket(SRTSOCKET socket)
{
// Lock this for the whole function as we need the group
// to persist the call.
ScopedLock glock(uglobal().m_GlobControlLock);
SharedLock glock(uglobal().m_GlobControlLock);
CUDTSocket* s = uglobal().locateSocket_LOCKED(socket);
if (!s || !s->m_GroupOf)
return APIError(MJ_NOTSUP, MN_INVAL, 0);
Expand Down
9 changes: 8 additions & 1 deletion srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,12 @@ class CUDTUnited
groups_t m_Groups;
#endif

sync::Mutex m_GlobControlLock; // used to synchronize UDT API
/// Guards all member containers of `CUDTUnited`. Protect UDT API from data races.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UDT???

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Guards all member containers of `CUDTUnited`. Protect UDT API from data races.
/// Guards all member containers of `CUDTUnited`. Protect SRT API from data races.

/// Non-exclusive lock prohibits changes of containers (insert/remove), but allows modifications
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Shared lock" would be not unconfusing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Non-exclusive lock prohibits changes of containers (insert/remove), but allows modifications
/// A shared (non-exclusive) lock prohibits changes of containers (insert/remove), but allows modifications

/// of the contained objects (sockets, groups).
/// Exclusive lock is required for changes of the containers (insert/remove).
/// NB! Changes to the elements of the m_mMultiplexer must be protected exclusively.
mutable sync::SharedMutex m_GlobControlLock;

sync::Mutex m_IDLock; // used to synchronize ID generation

Expand All @@ -421,6 +426,8 @@ class CUDTUnited
private:
friend struct FLookupSocketWithEvent_LOCKED;

bool checkSocketExists(const SRTSOCKET u, bool isGroup) const;

CUDTSocket* locateSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);
// This function does the same as locateSocket, except that:
// - lock on m_GlobControlLock is expected (so that you don't unlock between finding and using)
Expand Down
Loading
Loading