Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Added busy counter for sockets and various fixes for data race problems #2893

Merged
merged 27 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0bee6d8
[test] Improved tests to avoid all-breaking asserts and inter-test me…
Feb 27, 2024
d8dfa0c
(First concept, failing tests, to be fixed)
Feb 27, 2024
79bb548
Updated and fixed
Feb 27, 2024
74592c1
Fixed more tests
Feb 27, 2024
2075a65
Fixed tab indent in CMakeLists
ethouris Feb 27, 2024
0f0597a
Merge branch 'dev-make-test-reuseaddr-fixture' into dev-add-socket-bu…
Feb 27, 2024
a82fd26
Added extra fixes for data races
Feb 28, 2024
278fc72
Fixed file transmission test on Windows
Feb 28, 2024
9459d36
Merge branch 'dev-make-test-reuseaddr-fixture' into dev-add-socket-bu…
Feb 28, 2024
cd5d771
Fixed some build breaks
Feb 28, 2024
085043a
Armed UniqueSocket in file/line grab for creation location
Feb 28, 2024
4638192
Still tracking close error on Ubuntu
Feb 29, 2024
ca98ee3
Added explicit closing of client socket to avoid auto-close-broken pr…
Feb 29, 2024
df2f8e7
Merge branch 'dev-make-test-reuseaddr-fixture' into dev-add-socket-bu…
Feb 29, 2024
212750d
Updated and merged
Mar 13, 2024
f56dba5
Updated to latest upstream. Fixed wrong comments
May 7, 2024
82bfbd3
Updated to latest master
Jul 18, 2024
11dd050
Applied fixes from code review
Jul 18, 2024
08a4010
Fixed constness of isStillBusy
Jul 18, 2024
6e2496a
Updated failing test to track the failure cause
Jul 18, 2024
326d324
Some sanitizer and warning fixes
Jul 25, 2024
ed5245d
Updated and fixed
Aug 14, 2024
97b26d9
Added guarding the socket for the whole length of a packet dispatchin…
Aug 15, 2024
9bccc3e
Updated and fixed
Aug 15, 2024
fb816e3
Merge remote-tracking branch 'ethouris/dev-add-socket-busy-counter' i…
Aug 15, 2024
abb99dc
Fixed per code review
Aug 19, 2024
85f7922
Merge remote-tracking branch 'ethouris/dev-add-socket-busy-counter' i…
Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 77 additions & 6 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1913,11 +1913,28 @@ int srt::CUDTUnited::close(const SRTSOCKET u)
return 0;
}
#endif
CUDTSocket* s = locateSocket(u);
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
#if ENABLE_HEAVY_LOGGING
// Wrapping the log into a destructor so that it
// is printed AFTER the destructor of SocketKeeper.
struct ScopedExitLog
{
const CUDTSocket* const ps;
ScopedExitLog(const CUDTSocket* p): ps(p){}
~ScopedExitLog()
{
if (ps) // Could be not acquired by SocketKeeper, occasionally
{
HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy());
}
}
};
#endif

SocketKeeper k(*this, u, ERH_THROW);
IF_HEAVY_LOGGING(ScopedExitLog slog(k.socket));
HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy());

return close(s);
return close(k.socket);
}

#if ENABLE_BONDING
Expand Down Expand Up @@ -2546,6 +2563,45 @@ srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s)
}
#endif

srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling erh)
{
ScopedLock cg(m_GlobControlLock);

CUDTSocket* s = locateSocket_LOCKED(u);
if (!s)
{
if (erh == ERH_THROW)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
return NULL;
}

s->apiAcquire();
return s;
}

bool srt::CUDTUnited::acquireSocket(CUDTSocket* s)
{
// Note that before using this function you must be certain
// that the socket isn't broken already and it still has at least
// one more GC cycle to live. In other words, you must be certain
// that this pointer passed here isn't dangling and was obtained
// directly from m_Sockets, or even better, has been acquired
// by some other functionality already, which is only about to
// be released earlier than you need.
ScopedLock cg(m_GlobControlLock);
s->apiAcquire();
// Keep the lock so that no one changes anything in the meantime.
// If the socket m_Status == SRTS_CLOSED (set by setClosed()), then
// this socket is no longer present in the m_Sockets container
if (s->m_Status >= SRTS_BROKEN)
{
s->apiRelease();
return false;
}

return true;
}

srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn)
{
ScopedLock cg(m_GlobControlLock);
Expand Down Expand Up @@ -2612,7 +2668,7 @@ void srt::CUDTUnited::checkBrokenSockets()

if (s->m_Status == SRTS_LISTENING)
{
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp.load();
// A listening socket should wait an extra 3 seconds
// in case a client is connecting.
if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
Expand Down Expand Up @@ -2671,6 +2727,13 @@ void srt::CUDTUnited::checkBrokenSockets()
for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j)
{
CUDTSocket* ps = j->second;

if (ps->isStillBusy())
{
HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << ps->m_SocketID << " is still busy, SKIPPING THIS CYCLE.");
continue;
}

maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
CUDT& u = ps->core();

// HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first);
Expand All @@ -2690,7 +2753,7 @@ void srt::CUDTUnited::checkBrokenSockets()
// timeout 1 second to destroy a socket AND it has been removed from
// RcvUList
const steady_clock::time_point now = steady_clock::now();
const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp;
const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp.load();
if (closed_ago > seconds_from(1))
{
CRNode* rnode = u.m_pRNode;
Expand Down Expand Up @@ -2740,6 +2803,14 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
if (rn && rn->m_bOnList)
return;

if (s->isStillBusy())
{
HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " is still busy, NOT deleting");
return;
}

LOGC(smlog.Note, log << "@" << s->m_SocketID << " busy=" << s->isStillBusy());

#if ENABLE_BONDING
if (s->m_GroupOf)
{
Expand Down
63 changes: 59 additions & 4 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ class CUDTSocket

void construct();

private:
srt::sync::atomic<int> m_iBusy;
public:
void apiAcquire() { ++m_iBusy; }
void apiRelease() { --m_iBusy; }

int isStillBusy() const
{
return m_iBusy;
}


SRT_ATTR_GUARDED_BY(m_ControlLock)
sync::atomic<SRT_SOCKSTATUS> m_Status; //< current socket state

Expand All @@ -131,7 +143,8 @@ class CUDTSocket
/// of sockets in order to prevent other methods from accessing invalid address.
/// A timer is started and the socket will be removed after approximately
/// 1 second (see CUDTUnited::checkBrokenSockets()).
sync::steady_clock::time_point m_tsClosureTimeStamp;
//sync::steady_clock::time_point m_tsClosureTimeStamp;
sync::AtomicClock<sync::steady_clock> m_tsClosureTimeStamp;

sockaddr_any m_SelfAddr; //< local address of the socket
sockaddr_any m_PeerAddr; //< peer address of the socket
Expand Down Expand Up @@ -324,7 +337,7 @@ class CUDTUnited
int epoll_release(const int eid);

#if ENABLE_BONDING
// [[using locked(m_GlobControlLock)]]
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock)
CUDTGroup& addGroup(SRTSOCKET id, SRT_GROUP_TYPE type)
{
// This only ensures that the element exists.
Expand All @@ -346,7 +359,7 @@ class CUDTUnited
void deleteGroup(CUDTGroup* g);
void deleteGroup_LOCKED(CUDTGroup* g);

// [[using locked(m_GlobControlLock)]]
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock)
CUDTGroup* findPeerGroup_LOCKED(SRTSOCKET peergroup)
{
for (groups_t::iterator i = m_Groups.begin(); i != m_Groups.end(); ++i)
Expand Down Expand Up @@ -445,8 +458,50 @@ class CUDTUnited
}
}
};

#endif

CUDTSocket* locateAcquireSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);
bool acquireSocket(CUDTSocket* s);

public:
struct SocketKeeper
{
CUDTSocket* socket;

SocketKeeper(): socket(NULL) {}

// This is intended for API functions to lock the socket's existence
// for the lifetime of their call.
SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); }

// This is intended for TSBPD thread that should lock the socket's
// existence until it exits.
SocketKeeper(CUDTUnited& glob, CUDTSocket* s)
{
acquire(glob, s);
}

// Note: acquire doesn't check if the keeper already keeps anything.
// This is only for a use together with an empty constructor.
bool acquire(CUDTUnited& glob, CUDTSocket* s)
{
const bool caught = glob.acquireSocket(s);
socket = caught ? s : NULL;
return caught;
}

~SocketKeeper()
{
if (socket)
{
SRT_ASSERT(socket->isStillBusy() > 0);
socket->apiRelease();
}
}
};

private:

void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL);
bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls);

Expand Down
30 changes: 19 additions & 11 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5844,13 +5844,15 @@
}

#if ENABLE_BONDING
m_ConnectionLock.unlock();
// The socket and the group are only linked to each other after interpretSrtHandshake(..) has been called.
// Keep the group alive for the lifetime of this function,
// and do it BEFORE acquiring m_ConnectionLock to avoid
// lock inversion.
// This will check if a socket belongs to a group and if so
// it will remember this group and keep it alive here.
CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent);
m_ConnectionLock.lock();
#endif

if (!prepareBuffers(NULL))
Expand Down Expand Up @@ -7122,7 +7124,7 @@

do
{
if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(steady_clock::now()))
if (stillConnected() && !timeout && !isRcvBufferReady())
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
{
/* Kick TsbPd thread to schedule next wakeup (if running) */
if (m_bTsbPd)
Expand Down Expand Up @@ -8723,11 +8725,14 @@
// srt_recvfile (which doesn't make any sense), you'll have a deadlock.
if (m_config.bDriftTracer)
{
//enterCS(m_RcvBufferLock);
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved Hide resolved

#if ENABLE_BONDING
ScopedLock glock(uglobal().m_GlobControlLock);
ScopedLock glock(uglobal().m_GlobControlLock); // XXX not too excessive?
const bool drift_updated =
#endif
m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt);
//leaveCS(m_RcvBufferLock);
Fixed Show fixed Hide fixed

#if ENABLE_BONDING
if (drift_updated && m_parent->m_GroupOf)
Expand Down Expand Up @@ -11702,17 +11707,20 @@
// Bound to one call because this requires locking
pg->updateFailedLink();
}
// Sockets that never succeeded to connect must be deleted
// explicitly, otherwise they will never be deleted. OTOH
// the socket can be on the path of deletion already, so
// this only makes sure that the socket will be deleted,
// one way or another.
if (pending_broken)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
}

// Sockets that never succeeded to connect must be deleted
// explicitly, otherwise they will never be deleted.
if (pending_broken)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
#endif
}

Expand Down
8 changes: 5 additions & 3 deletions srtcore/logging.h
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ struct LogConfig
std::ostream* log_stream;
SRT_LOG_HANDLER_FN* loghandler_fn;
void* loghandler_opaque;
srt::sync::Mutex mutex;
mutable srt::sync::Mutex mutex;
int flags;

LogConfig(const fa_bitset_t& efa,
Expand All @@ -132,10 +132,10 @@ struct LogConfig
}

SRT_ATTR_ACQUIRE(mutex)
void lock() { mutex.lock(); }
void lock() const { mutex.lock(); }

SRT_ATTR_RELEASE(mutex)
void unlock() { mutex.unlock(); }
void unlock() const { mutex.unlock(); }
};

// The LogDispatcher class represents the object that is responsible for
Expand Down Expand Up @@ -424,8 +424,10 @@ inline bool LogDispatcher::CheckEnabled()
// when the enabler check is tested here. Worst case, the log
// will be printed just a moment after it was turned off.
const LogConfig* config = src_config; // to enforce using const operator[]
config->lock();
int configured_enabled_fa = config->enabled_fa[fa];
int configured_maxlevel = config->max_level;
config->unlock();

return configured_enabled_fa && level <= configured_maxlevel;
}
Expand Down
5 changes: 5 additions & 0 deletions srtcore/packet.h
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class IOVector
#endif
{
public:
IOVector()
{
set(0, 0);
}

inline void set(void* buffer, size_t length)
{
#ifdef _WIN32
Expand Down
Loading
Loading