From 0bee6d81986431789b3ff1f4ed989db300a1faee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Tue, 27 Feb 2024 12:03:32 +0100 Subject: [PATCH 01/16] [test] Improved tests to avoid all-breaking asserts and inter-test messup --- CMakeLists.txt | 7 +- srtcore/api.cpp | 23 +- srtcore/logging.h | 2 +- test/test_file_transmission.cpp | 1 + test/test_main.cpp | 7 +- test/test_many_connections.cpp | 14 +- test/test_reuseaddr.cpp | 546 ++++++++++++++------------------ 7 files changed, 281 insertions(+), 319 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a429cf495..0bcc62a86 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -73,7 +73,12 @@ if (NOT MICROSOFT) # that ENABLE_DEBUG is set as it should. if (ENABLE_DEBUG EQUAL 2) set (CMAKE_BUILD_TYPE "RelWithDebInfo") - add_definitions(-DNDEBUG) + if (ENABLE_ASSERT) + # Add _DEBUG macro if explicitly requested, to enable SRT_ASSERT(). + add_definitions(-D_DEBUG) + else() + add_definitions(-DNDEBUG) + endif() elseif (ENABLE_DEBUG) # 1, ON, YES, TRUE, Y, or any other non-zero number set (CMAKE_BUILD_TYPE "Debug") diff --git a/srtcore/api.cpp b/srtcore/api.cpp index cb9e5e65d..f9ac6a8d1 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -2662,31 +2662,34 @@ void srt::CUDTUnited::checkBrokenSockets() for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j) { + CUDTSocket* ps = j->second; + CUDT& u = ps->core(); + // HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first); - if (!is_zero(j->second->core().m_tsLingerExpiration)) + if (!is_zero(u.m_tsLingerExpiration)) { // asynchronous close: - if ((!j->second->core().m_pSndBuffer) || (0 == j->second->core().m_pSndBuffer->getCurrBufSize()) || - (j->second->core().m_tsLingerExpiration <= steady_clock::now())) + if ((!u.m_pSndBuffer) || (0 == u.m_pSndBuffer->getCurrBufSize()) || + (u.m_tsLingerExpiration <= steady_clock::now())) { - HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << j->second->m_SocketID); - j->second->core().m_tsLingerExpiration = steady_clock::time_point(); - j->second->core().m_bClosing = true; - j->second->m_tsClosureTimeStamp = steady_clock::now(); + HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << ps->m_SocketID); + u.m_tsLingerExpiration = steady_clock::time_point(); + u.m_bClosing = true; + ps->m_tsClosureTimeStamp = steady_clock::now(); } } // timeout 1 second to destroy a socket AND it has been removed from // RcvUList const steady_clock::time_point now = steady_clock::now(); - const steady_clock::duration closed_ago = now - j->second->m_tsClosureTimeStamp; + const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp; if (closed_ago > seconds_from(1)) { - CRNode* rnode = j->second->core().m_pRNode; + CRNode* rnode = u.m_pRNode; if (!rnode || !rnode->m_bOnList) { HLOGC(smlog.Debug, - log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed " + log << "checkBrokenSockets: @" << ps->m_SocketID << " closed " << FormatDuration(closed_ago) << " ago and removed from RcvQ - will remove"); // HLOGC(smlog.Debug, log << "will unref socket: " << j->first); diff --git a/srtcore/logging.h b/srtcore/logging.h index 608234eab..c17781c24 100644 --- a/srtcore/logging.h +++ b/srtcore/logging.h @@ -154,7 +154,7 @@ struct SRT_API LogDispatcher LogLevel::type level; static const size_t MAX_PREFIX_SIZE = 32; char prefix[MAX_PREFIX_SIZE+1]; - bool enabled; + srt::sync::atomic enabled; LogConfig* src_config; bool isset(int flg) { return (src_config->flags & flg) != 0; } diff --git a/test/test_file_transmission.cpp b/test/test_file_transmission.cpp index 97f9e684a..a538683dd 100644 --- a/test/test_file_transmission.cpp +++ b/test/test_file_transmission.cpp @@ -31,6 +31,7 @@ TEST(Transmission, FileUpload) { srt::TestInit srtinit; + srtinit.HandlePerTestOptions(); // Generate the source file // We need a file that will contain more data diff --git a/test/test_main.cpp b/test/test_main.cpp index cc5acc487..cc536802f 100644 --- a/test/test_main.cpp +++ b/test/test_main.cpp @@ -113,6 +113,11 @@ void TestInit::HandlePerTestOptions() { srt_setloglevel(LOG_DEBUG); } + + if (TestEnv::me->OptionPresent("lognote")) + { + srt_setloglevel(LOG_NOTICE); + } } // Copied from ../apps/apputil.cpp, can't really link this file here. @@ -178,7 +183,7 @@ sockaddr_any CreateAddr(const std::string& name, unsigned short port, int pref_f UniqueSocket::~UniqueSocket() { - srt_close(sock); + EXPECT_NE(srt_close(sock), SRT_ERROR); } } diff --git a/test/test_many_connections.cpp b/test/test_many_connections.cpp index 20deccf70..29cd6bdd2 100644 --- a/test/test_many_connections.cpp +++ b/test/test_many_connections.cpp @@ -135,6 +135,8 @@ TEST_F(TestConnection, Multiple) cerr << "Opening " << NSOCK << " connections\n"; + bool overall_test = true; + for (size_t i = 0; i < NSOCK; i++) { m_connections[i] = srt_create_socket(); @@ -145,13 +147,18 @@ TEST_F(TestConnection, Multiple) int conntimeo = 60; srt_setsockflag(m_connections[i], SRTO_CONNTIMEO, &conntimeo, sizeof conntimeo); + SRTSOCKET connres = SRT_INVALID_SOCK; + //cerr << "Connecting #" << i << " to " << sockaddr_any(psa).str() << "...\n"; //cerr << "Connecting to: " << sockaddr_any(psa).str() << endl; - ASSERT_NE(srt_connect(m_connections[i], psa, sizeof lsa), SRT_ERROR); + connres = srt_connect(m_connections[i], psa, sizeof lsa); + EXPECT_NE(connres, SRT_INVALID_SOCK) << "conn #" << i << ": " << srt_getlasterror_str(); + if (connres == SRT_INVALID_SOCK) + overall_test = false; // Set now async sending so that sending isn't blocked int no = 0; - ASSERT_NE(srt_setsockflag(m_connections[i], SRTO_SNDSYN, &no, sizeof no), -1); + EXPECT_NE(srt_setsockflag(m_connections[i], SRTO_SNDSYN, &no, sizeof no), -1); } for (size_t j = 1; j <= 100; j++) @@ -170,6 +177,7 @@ TEST_F(TestConnection, Multiple) EXPECT_FALSE(m_accept_exit) << "AcceptLoop already broken for some reason!"; // Up to this moment the server sock should survive + cerr << "Closing server socket\n"; // Close server socket to break the accept loop EXPECT_EQ(srt_close(m_server_sock), 0); @@ -177,6 +185,8 @@ TEST_F(TestConnection, Multiple) cerr << "Synchronize with the accepting thread\n"; ex.wait(); cerr << "Synchronization done\n"; + + ASSERT_TRUE(overall_test); } diff --git a/test/test_reuseaddr.cpp b/test/test_reuseaddr.cpp index fe9027311..8779a3a79 100644 --- a/test/test_reuseaddr.cpp +++ b/test/test_reuseaddr.cpp @@ -30,7 +30,7 @@ struct AtReturnJoin // iphlp library to be attached to the executable, which is kinda // problematic. Temporarily block tests using this function on Windows. -std::string GetLocalIP(int af = AF_UNSPEC) +static std::string GetLocalIP(int af = AF_UNSPEC) { std::cout << "!!!WARNING!!!: GetLocalIP not supported, test FORCEFULLY passed\n"; return ""; @@ -50,7 +50,7 @@ struct IfAddr } }; -std::string GetLocalIP(int af = AF_UNSPEC) +static std::string GetLocalIP(int af = AF_UNSPEC) { struct ifaddrs * ifa=NULL; void * tmpAddrPtr=NULL; @@ -101,306 +101,320 @@ std::string GetLocalIP(int af = AF_UNSPEC) } #endif -int client_pollid = SRT_ERROR; -SRTSOCKET g_client_sock = SRT_ERROR; - -void clientSocket(std::string ip, int port, bool expect_success) +class ReuseAddr : public srt::Test { - int yes = 1; - int no = 0; + int m_client_pollid = SRT_ERROR; + int m_server_pollid = SRT_ERROR; + - int family = AF_INET; - std::string famname = "IPv4"; - if (ip.substr(0, 2) == "6.") +protected: + void clientSocket(SRTSOCKET client_sock, std::string ip, int port, bool expect_success) { - family = AF_INET6; - ip = ip.substr(2); - famname = "IPv6"; - } + using namespace std; - std::cout << "[T/C] Creating client socket\n"; + int yes = 1; + int no = 0; - g_client_sock = srt_create_socket(); - ASSERT_NE(g_client_sock, SRT_ERROR); + int family = AF_INET; + string famname = "IPv4"; + if (ip.substr(0, 2) == "6.") + { + family = AF_INET6; + ip = ip.substr(2); + famname = "IPv6"; + } - ASSERT_NE(srt_setsockopt(g_client_sock, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // for async connect - ASSERT_NE(srt_setsockflag(g_client_sock, SRTO_SENDER, &yes, sizeof yes), SRT_ERROR); + cout << "[T/C] Setting up client socket\n"; + ASSERT_NE(client_sock, SRT_INVALID_SOCK); + ASSERT_EQ(srt_getsockstate(client_sock), SRTS_INIT); - ASSERT_NE(srt_setsockopt(g_client_sock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); + EXPECT_NE(srt_setsockflag(client_sock, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // for async connect + EXPECT_NE(srt_setsockflag(client_sock, SRTO_SENDER, &yes, sizeof yes), SRT_ERROR); + EXPECT_NE(srt_setsockflag(client_sock, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); - int epoll_out = SRT_EPOLL_OUT; - srt_epoll_add_usock(client_pollid, g_client_sock, &epoll_out); + int epoll_out = SRT_EPOLL_OUT; + srt_epoll_add_usock(m_client_pollid, client_sock, &epoll_out); - sockaddr_any sa = srt::CreateAddr(ip, port, family); + sockaddr_any sa = srt::CreateAddr(ip, port, family); - std::cout << "[T/C] Connecting to: " << sa.str() << " (" << famname << ")" << std::endl; + cout << "[T/C] Connecting to: " << sa.str() << " (" << famname << ")" << endl; - int connect_res = srt_connect(g_client_sock, sa.get(), sa.size()); + int connect_res = srt_connect(client_sock, sa.get(), sa.size()); - if (connect_res == -1) - { - std::cout << "srt_connect: " << srt_getlasterror_str() << std::endl; - } - - if (expect_success) - { - EXPECT_NE(connect_res, -1); if (connect_res == -1) - return; - - // Socket readiness for connection is checked by polling on WRITE allowed sockets. - - if (connect_res != -1) { - int rlen = 2; - SRTSOCKET read[2]; - - int wlen = 2; - SRTSOCKET write[2]; - - std::cout << "[T/C] Waiting for connection readiness...\n"; - - EXPECT_NE(srt_epoll_wait(client_pollid, read, &rlen, - write, &wlen, - -1, // -1 is set for debuging purpose. - // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR); - - - EXPECT_EQ(rlen, 0); // get exactly one write event without reads - EXPECT_EQ(wlen, 1); // get exactly one write event without reads - EXPECT_EQ(write[0], g_client_sock); // for our client socket + cout << "srt_connect: " << srt_getlasterror_str() << endl; + } - char buffer[1316] = {1, 2, 3, 4}; - EXPECT_NE(srt_sendmsg(g_client_sock, buffer, sizeof buffer, - -1, // infinit ttl - true // in order must be set to true - ), - SRT_ERROR); + if (expect_success) + { + EXPECT_NE(connect_res, -1); + if (connect_res == -1) + return; + + // Socket readiness for connection is checked by polling on WRITE allowed sockets. + + if (connect_res != -1) + { + int rlen = 2; + SRTSOCKET read[2]; + + int wlen = 2; + SRTSOCKET write[2]; + + cout << "[T/C] Waiting for connection readiness...\n"; + + EXPECT_NE(srt_epoll_wait(m_client_pollid, read, &rlen, + write, &wlen, + -1, // -1 is set for debuging purpose. + // in case of production we need to set appropriate value + 0, 0, 0, 0), SRT_ERROR); + + EXPECT_EQ(rlen, 0) << "[T/C] read-ready"; // get exactly one write event without reads + EXPECT_EQ(wlen, 1) << "[T/C] write-ready"; // get exactly one write event without reads + EXPECT_EQ(write[0], client_sock); // for our client socket + + char buffer[1316] = {1, 2, 3, 4}; + EXPECT_NE(srt_sendmsg(client_sock, buffer, sizeof buffer, + -1, // infinit ttl + true // in order must be set to true + ), + SRT_ERROR); + } + else + { + cout << "[T/C] (NOT TESTING TRANSMISSION - CONNECTION FAILED ALREADY)\n"; + } } else { - std::cout << "[T/C] (NOT TESTING TRANSMISSION - CONNECTION FAILED ALREADY)\n"; + EXPECT_EQ(connect_res, -1); } - } - else - { - EXPECT_EQ(connect_res, -1); + + cout << "[T/C] Client exit\n"; } - std::cout << "[T/C] Client exit\n"; -} + SRTSOCKET prepareSocket() + { + SRTSOCKET bindsock = srt_create_socket(); + EXPECT_NE(bindsock, SRT_ERROR); -int server_pollid = SRT_ERROR; + int yes = 1; + int no = 0; -SRTSOCKET prepareSocket() -{ - SRTSOCKET bindsock = srt_create_socket(); - EXPECT_NE(bindsock, SRT_ERROR); + EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // for async connect + EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); - int yes = 1; - int no = 0; + int epoll_in = SRT_EPOLL_IN; - EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // for async connect - EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); + std::cout << "[T/S] Listener/binder sock @" << bindsock << " added to m_server_pollid\n"; + srt_epoll_add_usock(m_server_pollid, bindsock, &epoll_in); - int epoll_in = SRT_EPOLL_IN; + return bindsock; + } - std::cout << "[T/S] Listener/binder sock @" << bindsock << " added to server_pollid\n"; - srt_epoll_add_usock(server_pollid, bindsock, &epoll_in); + bool bindSocket(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) + { + sockaddr_any sa = srt::CreateAddr(ip, port, AF_INET); - return bindsock; -} + std::string fam = (sa.family() == AF_INET) ? "IPv4" : "IPv6"; -bool bindSocket(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) -{ - sockaddr_any sa = srt::CreateAddr(ip, port, AF_INET); + std::cout << "[T/S] Bind @" << bindsock << " to: " << sa.str() << " (" << fam << ")" << std::endl; - std::string fam = (sa.family() == AF_INET) ? "IPv4" : "IPv6"; + int bind_res = srt_bind(bindsock, sa.get(), sa.size()); - std::cout << "[T/S] Bind @" << bindsock << " to: " << sa.str() << " (" << fam << ")" << std::endl; + std::cout << "[T/S] ... result " << bind_res << " (expected to " + << (expect_success ? "succeed" : "fail") << ")\n"; - int bind_res = srt_bind(bindsock, sa.get(), sa.size()); + if (!expect_success) + { + std::cout << "[T/S] Binding should fail: " << srt_getlasterror_str() << std::endl; + EXPECT_EQ(bind_res, SRT_ERROR); + return false; + } - std::cout << "[T/S] ... result " << bind_res << " (expected to " - << (expect_success ? "succeed" : "fail") << ")\n"; + EXPECT_NE(bind_res, SRT_ERROR); + return true; + } - if (!expect_success) + bool bindListener(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) { - std::cout << "[T/S] Binding should fail: " << srt_getlasterror_str() << std::endl; - EXPECT_EQ(bind_res, SRT_ERROR); - return false; - } + if (!bindSocket(bindsock, ip, port, expect_success)) + return false; - EXPECT_NE(bind_res, SRT_ERROR); - return true; -} + EXPECT_NE(srt_listen(bindsock, SOMAXCONN), SRT_ERROR); -bool bindListener(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) -{ - if (!bindSocket(bindsock, ip, port, expect_success)) - return false; + return true; + } - EXPECT_NE(srt_listen(bindsock, SOMAXCONN), SRT_ERROR); + SRTSOCKET createListener(std::string ip, int port, bool expect_success) + { + std::cout << "[T/S] serverSocket: creating listener socket\n"; - return true; -} + SRTSOCKET bindsock = prepareSocket(); -SRTSOCKET createListener(std::string ip, int port, bool expect_success) -{ - std::cout << "[T/S] serverSocket: creating listener socket\n"; + if (!bindListener(bindsock, ip, port, expect_success)) + return SRT_INVALID_SOCK; - SRTSOCKET bindsock = prepareSocket(); + return bindsock; + } - if (!bindListener(bindsock, ip, port, expect_success)) - return SRT_INVALID_SOCK; + SRTSOCKET createBinder(std::string ip, int port, bool expect_success) + { + std::cout << "[T/S] serverSocket: creating binder socket\n"; - return bindsock; -} + SRTSOCKET bindsock = prepareSocket(); -SRTSOCKET createBinder(std::string ip, int port, bool expect_success) -{ - std::cout << "[T/S] serverSocket: creating binder socket\n"; + if (!bindSocket(bindsock, ip, port, expect_success)) + { + srt_close(bindsock); + return SRT_INVALID_SOCK; + } - SRTSOCKET bindsock = prepareSocket(); + return bindsock; + } - if (!bindSocket(bindsock, ip, port, expect_success)) + void testAccept(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) { - srt_close(bindsock); - return SRT_INVALID_SOCK; - } + srt::UniqueSocket client_sock = srt_create_socket(); - return bindsock; -} + auto run = [this, &client_sock, ip, port, expect_success]() { clientSocket(client_sock, ip, port, expect_success); }; -void testAccept(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) -{ + auto launched = std::async(std::launch::async, run); - auto run = [ip, port, expect_success]() { clientSocket(ip, port, expect_success); }; + AtReturnJoin atreturn_join {launched}; - auto launched = std::async(std::launch::async, run); - - AtReturnJoin atreturn_join {launched}; + { // wait for connection from client + int rlen = 2; + SRTSOCKET read[2]; - { // wait for connection from client - int rlen = 2; - SRTSOCKET read[2]; + int wlen = 2; + SRTSOCKET write[2]; - int wlen = 2; - SRTSOCKET write[2]; + std::cout << "[T/S] Wait 10s for acceptance on @" << bindsock << " ...\n"; - std::cout << "[T/S] Wait 10s for acceptance on @" << bindsock << " ...\n"; + EXPECT_NE(srt_epoll_wait(m_server_pollid, + read, &rlen, + write, &wlen, + 10000, // -1 is set for debuging purpose. + // in case of production we need to set appropriate value + 0, 0, 0, 0), SRT_ERROR ); - ASSERT_NE(srt_epoll_wait(server_pollid, - read, &rlen, - write, &wlen, - 10000, // -1 is set for debuging purpose. - // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR ); + EXPECT_EQ(rlen, 1); // get exactly one read event without writes + EXPECT_EQ(wlen, 0); // get exactly one read event without writes + ASSERT_EQ(read[0], bindsock); // read event is for bind socket + } - ASSERT_EQ(rlen, 1); // get exactly one read event without writes - ASSERT_EQ(wlen, 0); // get exactly one read event without writes - ASSERT_EQ(read[0], bindsock); // read event is for bind socket - } + { + sockaddr_any scl; + srt::UniqueSocket accepted_sock = srt_accept(bindsock, scl.get(), &scl.len); + if (accepted_sock == -1) + { + std::cout << "srt_accept: " << srt_getlasterror_str() << std::endl; + } + EXPECT_NE(accepted_sock.ref(), SRT_INVALID_SOCK); - sockaddr_any scl; + sockaddr_any showacp = (sockaddr*)&scl; + std::cout << "[T/S] Accepted from: " << showacp.str() << std::endl; - SRTSOCKET accepted_sock = srt_accept(bindsock, scl.get(), &scl.len); - if (accepted_sock == -1) - { - std::cout << "srt_accept: " << srt_getlasterror_str() << std::endl; - } - ASSERT_NE(accepted_sock, SRT_INVALID_SOCK); + int epoll_in = SRT_EPOLL_IN; + srt_epoll_add_usock(m_server_pollid, accepted_sock, &epoll_in); // wait for input - sockaddr_any showacp = (sockaddr*)&scl; - std::cout << "[T/S] Accepted from: " << showacp.str() << std::endl; + char buffer[1316]; + { // wait for 1316 packet from client + int rlen = 2; + SRTSOCKET read[2]; - int epoll_in = SRT_EPOLL_IN; - srt_epoll_add_usock(server_pollid, accepted_sock, &epoll_in); // wait for input + int wlen = 2; + SRTSOCKET write[2]; - char buffer[1316]; - { // wait for 1316 packet from client - int rlen = 2; - SRTSOCKET read[2]; + std::cout << "[T/S] Wait for data reception...\n"; - int wlen = 2; - SRTSOCKET write[2]; + EXPECT_NE(srt_epoll_wait(m_server_pollid, + read, &rlen, + write, &wlen, + -1, // -1 is set for debuging purpose. + // in case of production we need to set appropriate value + 0, 0, 0, 0), SRT_ERROR ); - std::cout << "[T/S] Wait for data reception...\n"; - ASSERT_NE(srt_epoll_wait(server_pollid, - read, &rlen, - write, &wlen, - -1, // -1 is set for debuging purpose. - // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR ); + EXPECT_EQ(rlen, 1) << "[T/S] read-ready"; // get exactly one read event without writes + EXPECT_EQ(wlen, 0) << "[T/S] write-ready"; // get exactly one read event without writes + EXPECT_EQ(read[0], accepted_sock.ref()); // read event is for bind socket + } + char pattern[4] = {1, 2, 3, 4}; - ASSERT_EQ(rlen, 1); // get exactly one read event without writes - ASSERT_EQ(wlen, 0); // get exactly one read event without writes - ASSERT_EQ(read[0], accepted_sock); // read event is for bind socket - } + EXPECT_EQ(srt_recvmsg(accepted_sock, buffer, sizeof buffer), + 1316); - char pattern[4] = {1, 2, 3, 4}; + EXPECT_EQ(memcmp(pattern, buffer, sizeof pattern), 0); - ASSERT_EQ(srt_recvmsg(accepted_sock, buffer, sizeof buffer), - 1316); + std::cout << "[T/S] closing sockets: ACP:@" << accepted_sock << " LSN:@" << bindsock << "...\n"; + } + // client_sock closed through UniqueSocket. + // cannot close client_sock after srt_sendmsg because of issue in api.c:2346 - EXPECT_EQ(memcmp(pattern, buffer, sizeof pattern), 0); + std::cout << "[T/S] joining client async (should close client socket)...\n"; + launched.get(); + } - std::cout << "[T/S] closing sockets: ACP:@" << accepted_sock << " LSN:@" << bindsock << " CLR:@" << g_client_sock << " ...\n"; - ASSERT_NE(srt_close(accepted_sock), SRT_ERROR); - ASSERT_NE(srt_close(g_client_sock), SRT_ERROR); // cannot close g_client_sock after srt_sendmsg because of issue in api.c:2346 + static void shutdownListener(SRTSOCKET bindsock) + { + // Silently ignore. Usually it should have been checked earlier, + // and an invalid sock might be expected in particular tests. + if (bindsock == SRT_INVALID_SOCK) + return; - std::cout << "[T/S] joining client async...\n"; - launched.get(); -} + int yes = 1; + EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &yes, sizeof yes), SRT_ERROR); // for async connect + EXPECT_NE(srt_close(bindsock), SRT_ERROR); -void shutdownListener(SRTSOCKET bindsock) -{ - // Silently ignore. Usually it should have been checked earlier, - // and an invalid sock might be expected in particular tests. - if (bindsock == SRT_INVALID_SOCK) - return; + std::chrono::milliseconds check_period (100); + int credit = 400; // 10 seconds + auto then = std::chrono::steady_clock::now(); - int yes = 1; - EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &yes, sizeof yes), SRT_ERROR); // for async connect - EXPECT_NE(srt_close(bindsock), SRT_ERROR); + std::cout << "[T/S] waiting for cleanup of @" << bindsock << " up to 10s" << std::endl; + while (srt_getsockstate(bindsock) != SRTS_NONEXIST) + { + std::this_thread::sleep_for(check_period); + --credit; + if (!credit) + break; + } + auto now = std::chrono::steady_clock::now(); + auto dur = std::chrono::duration_cast(now - then); - std::chrono::milliseconds check_period (100); - int credit = 400; // 10 seconds - auto then = std::chrono::steady_clock::now(); + // Keep as single string because this tends to be mixed from 2 threads. + std::ostringstream sout; + sout << "[T/S] @" << bindsock << " dissolved after " + << (dur.count() / 1000.0) << "s" << std::endl; + std::cout << sout.str() << std::flush; - std::cout << "[T/S] waiting for cleanup of @" << bindsock << " up to 10s" << std::endl; - while (srt_getsockstate(bindsock) != SRTS_NONEXIST) - { - std::this_thread::sleep_for(check_period); - --credit; - if (!credit) - break; + EXPECT_NE(credit, 0); } - auto now = std::chrono::steady_clock::now(); - auto dur = std::chrono::duration_cast(now - then); - // Keep as single string because this tends to be mixed from 2 threads. - std::ostringstream sout; - sout << "[T/S] @" << bindsock << " dissolved after " - << (dur.count() / 1000.0) << "s" << std::endl; - std::cout << sout.str() << std::flush; +private: - EXPECT_NE(credit, 0); -} + void setup() + { + m_client_pollid = srt_epoll_create(); + ASSERT_NE(m_client_pollid, SRT_ERROR); -TEST(ReuseAddr, SameAddr1) -{ - srt::TestInit srtinit; + m_server_pollid = srt_epoll_create(); + ASSERT_NE(m_server_pollid, SRT_ERROR); + } - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); + void teardown() + { + (void)srt_epoll_release(m_client_pollid); + (void)srt_epoll_release(m_server_pollid); + } +}; - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); +TEST_F(ReuseAddr, SameAddr1) +{ SRTSOCKET bindsock_1 = createBinder("127.0.0.1", 5000, true); SRTSOCKET bindsock_2 = createListener("127.0.0.1", 5000, true); @@ -413,24 +427,14 @@ TEST(ReuseAddr, SameAddr1) s1.join(); s2.join(); - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, SameAddr2) +TEST_F(ReuseAddr, SameAddr2) { - srt::TestInit srtinit; std::string localip = GetLocalIP(AF_INET); if (localip == "") return; // DISABLE TEST if this doesn't work. - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createBinder(localip, 5000, true); SRTSOCKET bindsock_2 = createListener(localip, 5000, true); @@ -445,21 +449,11 @@ TEST(ReuseAddr, SameAddr2) testAccept(bindsock_3, localip, 5000, true); shutdownListener(bindsock_3); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, SameAddrV6) +TEST_F(ReuseAddr, SameAddrV6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); SRTSOCKET bindsock_1 = createBinder("::1", 5000, true); SRTSOCKET bindsock_2 = createListener("::1", 5000, true); @@ -475,26 +469,15 @@ TEST(ReuseAddr, SameAddrV6) testAccept(bindsock_3, "::1", 5000, true); shutdownListener(bindsock_3); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, DiffAddr) +TEST_F(ReuseAddr, DiffAddr) { - srt::TestInit srtinit; std::string localip = GetLocalIP(AF_INET); if (localip == "") return; // DISABLE TEST if this doesn't work. - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createBinder("127.0.0.1", 5000, true); SRTSOCKET bindsock_2 = createListener(localip, 5000, true); @@ -505,14 +488,10 @@ TEST(ReuseAddr, DiffAddr) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, Wildcard) +TEST_F(ReuseAddr, Wildcard) { - srt::TestInit srtinit; #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; @@ -524,14 +503,6 @@ TEST(ReuseAddr, Wildcard) std::string localip = GetLocalIP(AF_INET); if (localip == "") return; // DISABLE TEST if this doesn't work. - - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createListener("0.0.0.0", 5000, true); // Binding a certain address when wildcard is already bound should fail. @@ -541,15 +512,11 @@ TEST(ReuseAddr, Wildcard) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, Wildcard6) +TEST_F(ReuseAddr, Wildcard6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; @@ -567,13 +534,6 @@ TEST(ReuseAddr, Wildcard6) // performed there. std::string localip_v4 = GetLocalIP(AF_INET); - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - // This must be obligatory set before binding a socket to "::" int strict_ipv6 = 1; @@ -620,28 +580,18 @@ TEST(ReuseAddr, Wildcard6) shutdownListener(bindsock_1); shutdownListener(bindsock_2); shutdownListener(bindsock_3); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, ProtocolVersion6) +TEST_F(ReuseAddr, ProtocolVersion6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; return; #endif - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createListener("0.0.0.0", 5000, true); // We need a small interception in this one. @@ -659,27 +609,18 @@ TEST(ReuseAddr, ProtocolVersion6) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, ProtocolVersionFaux6) +TEST_F(ReuseAddr, ProtocolVersionFaux6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; + #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; return; #endif - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createListener("0.0.0.0", 5000, true); // We need a small interception in this one. @@ -696,7 +637,4 @@ TEST(ReuseAddr, ProtocolVersionFaux6) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } From d8dfa0c6ea04f018eb610194e2a0a0fa71f244a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Tue, 27 Feb 2024 12:16:11 +0100 Subject: [PATCH 02/16] (First concept, failing tests, to be fixed) --- CMakeLists.txt | 7 +- srtcore/api.cpp | 114 ++++++- srtcore/api.h | 58 +++- srtcore/logging.h | 2 +- srtcore/queue.cpp | 36 ++- test/test_file_transmission.cpp | 1 + test/test_main.cpp | 7 +- test/test_many_connections.cpp | 14 +- test/test_reuseaddr.cpp | 545 ++++++++++++++------------------ 9 files changed, 458 insertions(+), 326 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 82b0e3cff..6c6feb5f9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -73,7 +73,12 @@ if (NOT MICROSOFT) # that ENABLE_DEBUG is set as it should. if (ENABLE_DEBUG EQUAL 2) set (CMAKE_BUILD_TYPE "RelWithDebInfo") - add_definitions(-DNDEBUG) + if (ENABLE_ASSERT) + # Add _DEBUG macro if explicitly requested, to enable SRT_ASSERT(). + add_definitions(-D_DEBUG) + else() + add_definitions(-DNDEBUG) + endif() elseif (ENABLE_DEBUG) # 1, ON, YES, TRUE, Y, or any other non-zero number set (CMAKE_BUILD_TYPE "Debug") diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 7be0d1d95..56c7086e7 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1905,11 +1905,28 @@ int srt::CUDTUnited::close(const SRTSOCKET u) return 0; } #endif - CUDTSocket* s = locateSocket(u); - if (!s) - throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0); - return close(s); + // Wrapping the log into a destructor so that it + // is printed AFTER the destructor of SocketKeeper. + struct ForceDestructor + { + CUDTSocket* ps; + ForceDestructor(): ps(NULL){} + ~ForceDestructor() + { + if (ps) // Could be not acquired by SocketKeeper, occasionally + { + LOGC(smlog.Note, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy()); + } + } + } fod; + + SocketKeeper k(*this, u, ERH_THROW); + fod.ps = k.socket; + LOGC(smlog.Note, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy()); + int ret = close(k.socket); + + return ret; } #if ENABLE_BONDING @@ -2538,6 +2555,57 @@ srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s) } #endif +srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling erh) +{ + ScopedLock cg(m_GlobControlLock); + + CUDTSocket* s = locateSocket_LOCKED(u); + if (!s) + { + if (erh == ERH_THROW) + throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0); + return NULL; + } + + s->apiAcquire(); + return s; +} + +bool srt::CUDTUnited::acquireSocket(CUDTSocket* s) +{ + /* + ScopedLock cg(m_GlobControlLock); + + // JUST IN CASE, try to find the socket in m_Sockets (NOT in m_ClosedSockets). + // If it's not there, just pretend it's already deleted. The m_ClosedSockets + // is a container that keeps sockets that are being still in use of other threads, + // but were dispatched or acquired before another activity has requested them + // to be deleted. + + // This uses simply a pointer value, so it should be safe even if the pointer + // was dangling. We state it's virtually impossible for a non-paused thread + // to get in a situation of having the reclaimed memory after a socket removed + // from m_Sockets to be reassigned to another new socket. Note that the socket + // being normally passed here as argument is a socket that was previously + // dispatched from somewhere else. + + for (sockets_t::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++i) + { + CUDTSocket* is = i->second; + + if (s == is) + { + */ + s->apiAcquire(); + return true; + /* + } + } + s->apiAcquire(); + return false; + */ +} + srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn) { ScopedLock cg(m_GlobControlLock); @@ -2662,31 +2730,41 @@ void srt::CUDTUnited::checkBrokenSockets() for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j) { + CUDTSocket* ps = j->second; + + if (ps->isStillBusy()) + { + HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << ps->m_SocketID << " is still busy, SKIPPING THIS CYCLE."); + continue; + } + + CUDT& u = ps->core(); + // HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first); - if (!is_zero(j->second->core().m_tsLingerExpiration)) + if (!is_zero(u.m_tsLingerExpiration)) { // asynchronous close: - if ((!j->second->core().m_pSndBuffer) || (0 == j->second->core().m_pSndBuffer->getCurrBufSize()) || - (j->second->core().m_tsLingerExpiration <= steady_clock::now())) + if ((!u.m_pSndBuffer) || (0 == u.m_pSndBuffer->getCurrBufSize()) || + (u.m_tsLingerExpiration <= steady_clock::now())) { - HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << j->second->m_SocketID); - j->second->core().m_tsLingerExpiration = steady_clock::time_point(); - j->second->core().m_bClosing = true; - j->second->m_tsClosureTimeStamp = steady_clock::now(); + HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << ps->m_SocketID); + u.m_tsLingerExpiration = steady_clock::time_point(); + u.m_bClosing = true; + ps->m_tsClosureTimeStamp = steady_clock::now(); } } // timeout 1 second to destroy a socket AND it has been removed from // RcvUList const steady_clock::time_point now = steady_clock::now(); - const steady_clock::duration closed_ago = now - j->second->m_tsClosureTimeStamp; + const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp; if (closed_ago > seconds_from(1)) { - CRNode* rnode = j->second->core().m_pRNode; + CRNode* rnode = u.m_pRNode; if (!rnode || !rnode->m_bOnList) { HLOGC(smlog.Debug, - log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed " + log << "checkBrokenSockets: @" << ps->m_SocketID << " closed " << FormatDuration(closed_ago) << " ago and removed from RcvQ - will remove"); // HLOGC(smlog.Debug, log << "will unref socket: " << j->first); @@ -2729,6 +2807,14 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) if (rn && rn->m_bOnList) return; + if (s->isStillBusy()) + { + HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " is still busy, NOT deleting"); + return; + } + + LOGC(smlog.Note, log << "@" << s->m_SocketID << " busy=" << s->isStillBusy()); + #if ENABLE_BONDING if (s->m_GroupOf) { diff --git a/srtcore/api.h b/srtcore/api.h index 9ba77d23a..9ee155962 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -123,6 +123,18 @@ class CUDTSocket void construct(); +private: + srt::sync::atomic m_iBusy; +public: + void apiAcquire() { ++m_iBusy; } + void apiRelease() { --m_iBusy; } + + int isStillBusy() + { + return m_iBusy; + } + + SRT_ATTR_GUARDED_BY(m_ControlLock) sync::atomic m_Status; //< current socket state @@ -442,8 +454,52 @@ class CUDTUnited } } }; - #endif + + CUDTSocket* locateAcquireSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN); + bool acquireSocket(CUDTSocket* s); + +public: + struct SocketKeeper + { + CUDTSocket* socket; + + SocketKeeper(): socket(NULL) {} + + // This is intended for API functions to lock the group's existence + // for the lifetime of their call. + SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); } + + // This is intended for TSBPD thread that should lock the group's + // existence until it exits. + SocketKeeper(CUDTUnited& glob, CUDTSocket* s) + { + acquire(glob, s); + } + + // Note: acquire doesn't check if the keeper already keeps anything. + // This is only for a use together with an empty constructor. + bool acquire(CUDTUnited& glob, CUDTSocket* s) + { + bool caught = glob.acquireSocket(s); + socket = caught ? s : NULL; + return caught; + } + + ~SocketKeeper() + { + if (socket) + { + SRT_ASSERT(socket.m_iBusy > 0); + socket->apiRelease(); + // Only now that the group lock is lifted, can the + // group be now deleted and this pointer potentially dangling + } + } + }; + +private: + void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL); bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls); diff --git a/srtcore/logging.h b/srtcore/logging.h index 608234eab..c17781c24 100644 --- a/srtcore/logging.h +++ b/srtcore/logging.h @@ -154,7 +154,7 @@ struct SRT_API LogDispatcher LogLevel::type level; static const size_t MAX_PREFIX_SIZE = 32; char prefix[MAX_PREFIX_SIZE+1]; - bool enabled; + srt::sync::atomic enabled; LogConfig* src_config; bool isset(int flg) { return (src_config->flags & flg) != 0; } diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index cf0016901..682aa1ff3 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -576,6 +576,13 @@ void* srt::CSndQueue::worker(void* param) continue; } + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), u->id()); + if (!sk.socket) + { + HLOGC(qslog.Debug, log << "Socket to be processed was deleted in the meantime, not packing"); + continue; + } + // pack a packet from the socket CPacket pkt; steady_clock::time_point next_send_time; @@ -588,7 +595,6 @@ void* srt::CSndQueue::worker(void* param) IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++); continue; } - const sockaddr_any addr = u->m_PeerAddr; if (!is_zero(next_send_time)) self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time); @@ -930,6 +936,16 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst EReadStatus read_st = rst; EConnectStatus conn_st = cst; + /* + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id); + if (!sk.socket) + { + // Socket deleted already, so stop this and proceed to the next loop. + LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, proceed to only removal from lists"); + toRemove.push_back(*i); + continue; + } + // */ if (cst != CONN_RENDEZVOUS && dest_id != 0) { @@ -976,14 +992,23 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst for (vector::iterator i = toRemove.begin(); i != toRemove.end(); ++i) { HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id); - // + remove(i->id); + /* + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id); + if (!sk.socket) + { + // This actually shall never happen, so it's a kind of paranoid check. + LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, NOT ACCESSING its contents"); + continue; + } + // */ + // Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue // because the next CUDT::close will not remove it from the queue when m_bConnecting = false, // and may crash on next pass. // // TODO: maybe lock i->u->m_ConnectionLock? i->u->m_bConnecting = false; - remove(i->u->m_SocketID); // DO NOT close the socket here because in this case it might be // unable to get status from at the right moment. Also only member @@ -994,6 +1019,11 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst CUDT::uglobal().m_EPoll.update_events( i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true); + // Make sure that the socket wasn't deleted in the meantime. + // Skip this part if it was. Note also that if the socket was + // decided to be deleted, it's already moved to m_ClosedSockets + // and should have been therefore already processed for deletion. + i->u->completeBrokenConnectionDependencies(i->errorcode); } diff --git a/test/test_file_transmission.cpp b/test/test_file_transmission.cpp index 97f9e684a..a538683dd 100644 --- a/test/test_file_transmission.cpp +++ b/test/test_file_transmission.cpp @@ -31,6 +31,7 @@ TEST(Transmission, FileUpload) { srt::TestInit srtinit; + srtinit.HandlePerTestOptions(); // Generate the source file // We need a file that will contain more data diff --git a/test/test_main.cpp b/test/test_main.cpp index cc5acc487..cc536802f 100644 --- a/test/test_main.cpp +++ b/test/test_main.cpp @@ -113,6 +113,11 @@ void TestInit::HandlePerTestOptions() { srt_setloglevel(LOG_DEBUG); } + + if (TestEnv::me->OptionPresent("lognote")) + { + srt_setloglevel(LOG_NOTICE); + } } // Copied from ../apps/apputil.cpp, can't really link this file here. @@ -178,7 +183,7 @@ sockaddr_any CreateAddr(const std::string& name, unsigned short port, int pref_f UniqueSocket::~UniqueSocket() { - srt_close(sock); + EXPECT_NE(srt_close(sock), SRT_ERROR); } } diff --git a/test/test_many_connections.cpp b/test/test_many_connections.cpp index 20deccf70..29cd6bdd2 100644 --- a/test/test_many_connections.cpp +++ b/test/test_many_connections.cpp @@ -135,6 +135,8 @@ TEST_F(TestConnection, Multiple) cerr << "Opening " << NSOCK << " connections\n"; + bool overall_test = true; + for (size_t i = 0; i < NSOCK; i++) { m_connections[i] = srt_create_socket(); @@ -145,13 +147,18 @@ TEST_F(TestConnection, Multiple) int conntimeo = 60; srt_setsockflag(m_connections[i], SRTO_CONNTIMEO, &conntimeo, sizeof conntimeo); + SRTSOCKET connres = SRT_INVALID_SOCK; + //cerr << "Connecting #" << i << " to " << sockaddr_any(psa).str() << "...\n"; //cerr << "Connecting to: " << sockaddr_any(psa).str() << endl; - ASSERT_NE(srt_connect(m_connections[i], psa, sizeof lsa), SRT_ERROR); + connres = srt_connect(m_connections[i], psa, sizeof lsa); + EXPECT_NE(connres, SRT_INVALID_SOCK) << "conn #" << i << ": " << srt_getlasterror_str(); + if (connres == SRT_INVALID_SOCK) + overall_test = false; // Set now async sending so that sending isn't blocked int no = 0; - ASSERT_NE(srt_setsockflag(m_connections[i], SRTO_SNDSYN, &no, sizeof no), -1); + EXPECT_NE(srt_setsockflag(m_connections[i], SRTO_SNDSYN, &no, sizeof no), -1); } for (size_t j = 1; j <= 100; j++) @@ -170,6 +177,7 @@ TEST_F(TestConnection, Multiple) EXPECT_FALSE(m_accept_exit) << "AcceptLoop already broken for some reason!"; // Up to this moment the server sock should survive + cerr << "Closing server socket\n"; // Close server socket to break the accept loop EXPECT_EQ(srt_close(m_server_sock), 0); @@ -177,6 +185,8 @@ TEST_F(TestConnection, Multiple) cerr << "Synchronize with the accepting thread\n"; ex.wait(); cerr << "Synchronization done\n"; + + ASSERT_TRUE(overall_test); } diff --git a/test/test_reuseaddr.cpp b/test/test_reuseaddr.cpp index fe9027311..b20093050 100644 --- a/test/test_reuseaddr.cpp +++ b/test/test_reuseaddr.cpp @@ -30,7 +30,7 @@ struct AtReturnJoin // iphlp library to be attached to the executable, which is kinda // problematic. Temporarily block tests using this function on Windows. -std::string GetLocalIP(int af = AF_UNSPEC) +static std::string GetLocalIP(int af = AF_UNSPEC) { std::cout << "!!!WARNING!!!: GetLocalIP not supported, test FORCEFULLY passed\n"; return ""; @@ -50,7 +50,7 @@ struct IfAddr } }; -std::string GetLocalIP(int af = AF_UNSPEC) +static std::string GetLocalIP(int af = AF_UNSPEC) { struct ifaddrs * ifa=NULL; void * tmpAddrPtr=NULL; @@ -101,306 +101,321 @@ std::string GetLocalIP(int af = AF_UNSPEC) } #endif -int client_pollid = SRT_ERROR; -SRTSOCKET g_client_sock = SRT_ERROR; - -void clientSocket(std::string ip, int port, bool expect_success) +class ReuseAddr : public srt::Test { - int yes = 1; - int no = 0; + int m_client_pollid = SRT_ERROR; + //SRTSOCKET m_client_sock = SRT_INVALID_SOCK; + int m_server_pollid = SRT_ERROR; + - int family = AF_INET; - std::string famname = "IPv4"; - if (ip.substr(0, 2) == "6.") +protected: + void clientSocket(std::string ip, int port, bool expect_success) { - family = AF_INET6; - ip = ip.substr(2); - famname = "IPv6"; - } + int yes = 1; + int no = 0; - std::cout << "[T/C] Creating client socket\n"; + int family = AF_INET; + std::string famname = "IPv4"; + if (ip.substr(0, 2) == "6.") + { + family = AF_INET6; + ip = ip.substr(2); + famname = "IPv6"; + } - g_client_sock = srt_create_socket(); - ASSERT_NE(g_client_sock, SRT_ERROR); + std::cout << "[T/C] Creating client socket\n"; - ASSERT_NE(srt_setsockopt(g_client_sock, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // for async connect - ASSERT_NE(srt_setsockflag(g_client_sock, SRTO_SENDER, &yes, sizeof yes), SRT_ERROR); + srt::UniqueSocket client_sock = srt_create_socket(); + ASSERT_NE(client_sock, SRT_ERROR); - ASSERT_NE(srt_setsockopt(g_client_sock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); + ASSERT_NE(srt_setsockopt(client_sock, 0, SRTO_SNDSYN, &no, sizeof no), SRT_ERROR); // for async connect + ASSERT_NE(srt_setsockflag(client_sock, SRTO_SENDER, &yes, sizeof yes), SRT_ERROR); - int epoll_out = SRT_EPOLL_OUT; - srt_epoll_add_usock(client_pollid, g_client_sock, &epoll_out); + ASSERT_NE(srt_setsockopt(client_sock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); - sockaddr_any sa = srt::CreateAddr(ip, port, family); + int epoll_out = SRT_EPOLL_OUT; + srt_epoll_add_usock(m_client_pollid, client_sock, &epoll_out); - std::cout << "[T/C] Connecting to: " << sa.str() << " (" << famname << ")" << std::endl; + sockaddr_any sa = srt::CreateAddr(ip, port, family); - int connect_res = srt_connect(g_client_sock, sa.get(), sa.size()); + std::cout << "[T/C] Connecting to: " << sa.str() << " (" << famname << ")" << std::endl; - if (connect_res == -1) - { - std::cout << "srt_connect: " << srt_getlasterror_str() << std::endl; - } + int connect_res = srt_connect(client_sock, sa.get(), sa.size()); - if (expect_success) - { - EXPECT_NE(connect_res, -1); if (connect_res == -1) - return; - - // Socket readiness for connection is checked by polling on WRITE allowed sockets. - - if (connect_res != -1) { - int rlen = 2; - SRTSOCKET read[2]; - - int wlen = 2; - SRTSOCKET write[2]; - - std::cout << "[T/C] Waiting for connection readiness...\n"; - - EXPECT_NE(srt_epoll_wait(client_pollid, read, &rlen, - write, &wlen, - -1, // -1 is set for debuging purpose. - // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR); - - - EXPECT_EQ(rlen, 0); // get exactly one write event without reads - EXPECT_EQ(wlen, 1); // get exactly one write event without reads - EXPECT_EQ(write[0], g_client_sock); // for our client socket + std::cout << "srt_connect: " << srt_getlasterror_str() << std::endl; + } - char buffer[1316] = {1, 2, 3, 4}; - EXPECT_NE(srt_sendmsg(g_client_sock, buffer, sizeof buffer, - -1, // infinit ttl - true // in order must be set to true - ), - SRT_ERROR); + if (expect_success) + { + EXPECT_NE(connect_res, -1); + if (connect_res == -1) + return; + + // Socket readiness for connection is checked by polling on WRITE allowed sockets. + + if (connect_res != -1) + { + int rlen = 2; + SRTSOCKET read[2]; + + int wlen = 2; + SRTSOCKET write[2]; + + std::cout << "[T/C] Waiting for connection readiness...\n"; + + EXPECT_NE(srt_epoll_wait(m_client_pollid, read, &rlen, + write, &wlen, + -1, // -1 is set for debuging purpose. + // in case of production we need to set appropriate value + 0, 0, 0, 0), SRT_ERROR); + + + EXPECT_EQ(rlen, 0); // get exactly one write event without reads + EXPECT_EQ(wlen, 1); // get exactly one write event without reads + EXPECT_EQ(write[0], client_sock); // for our client socket + + char buffer[1316] = {1, 2, 3, 4}; + EXPECT_NE(srt_sendmsg(client_sock, buffer, sizeof buffer, + -1, // infinit ttl + true // in order must be set to true + ), + SRT_ERROR); + } + else + { + std::cout << "[T/C] (NOT TESTING TRANSMISSION - CONNECTION FAILED ALREADY)\n"; + } } else { - std::cout << "[T/C] (NOT TESTING TRANSMISSION - CONNECTION FAILED ALREADY)\n"; + EXPECT_EQ(connect_res, -1); } - } - else - { - EXPECT_EQ(connect_res, -1); - } - std::cout << "[T/C] Client exit\n"; -} + std::cout << "[T/C] Client exit - should close @" << client_sock << "\n"; + } -int server_pollid = SRT_ERROR; + SRTSOCKET prepareSocket() + { + SRTSOCKET bindsock = srt_create_socket(); + EXPECT_NE(bindsock, SRT_ERROR); -SRTSOCKET prepareSocket() -{ - SRTSOCKET bindsock = srt_create_socket(); - EXPECT_NE(bindsock, SRT_ERROR); + int yes = 1; + int no = 0; - int yes = 1; - int no = 0; + EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // for async connect + EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); - EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // for async connect - EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); + int epoll_in = SRT_EPOLL_IN; - int epoll_in = SRT_EPOLL_IN; + std::cout << "[T/S] Listener/binder sock @" << bindsock << " added to m_server_pollid\n"; + srt_epoll_add_usock(m_server_pollid, bindsock, &epoll_in); - std::cout << "[T/S] Listener/binder sock @" << bindsock << " added to server_pollid\n"; - srt_epoll_add_usock(server_pollid, bindsock, &epoll_in); + return bindsock; + } - return bindsock; -} + bool bindSocket(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) + { + sockaddr_any sa = srt::CreateAddr(ip, port, AF_INET); -bool bindSocket(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) -{ - sockaddr_any sa = srt::CreateAddr(ip, port, AF_INET); + std::string fam = (sa.family() == AF_INET) ? "IPv4" : "IPv6"; - std::string fam = (sa.family() == AF_INET) ? "IPv4" : "IPv6"; + std::cout << "[T/S] Bind @" << bindsock << " to: " << sa.str() << " (" << fam << ")" << std::endl; - std::cout << "[T/S] Bind @" << bindsock << " to: " << sa.str() << " (" << fam << ")" << std::endl; + int bind_res = srt_bind(bindsock, sa.get(), sa.size()); - int bind_res = srt_bind(bindsock, sa.get(), sa.size()); + std::cout << "[T/S] ... result " << bind_res << " (expected to " + << (expect_success ? "succeed" : "fail") << ")\n"; - std::cout << "[T/S] ... result " << bind_res << " (expected to " - << (expect_success ? "succeed" : "fail") << ")\n"; + if (!expect_success) + { + std::cout << "[T/S] Binding should fail: " << srt_getlasterror_str() << std::endl; + EXPECT_EQ(bind_res, SRT_ERROR); + return false; + } - if (!expect_success) - { - std::cout << "[T/S] Binding should fail: " << srt_getlasterror_str() << std::endl; - EXPECT_EQ(bind_res, SRT_ERROR); - return false; + EXPECT_NE(bind_res, SRT_ERROR); + return true; } - EXPECT_NE(bind_res, SRT_ERROR); - return true; -} + bool bindListener(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) + { + if (!bindSocket(bindsock, ip, port, expect_success)) + return false; -bool bindListener(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) -{ - if (!bindSocket(bindsock, ip, port, expect_success)) - return false; + EXPECT_NE(srt_listen(bindsock, SOMAXCONN), SRT_ERROR); - EXPECT_NE(srt_listen(bindsock, SOMAXCONN), SRT_ERROR); + return true; + } - return true; -} + SRTSOCKET createListener(std::string ip, int port, bool expect_success) + { + std::cout << "[T/S] serverSocket: creating listener socket\n"; -SRTSOCKET createListener(std::string ip, int port, bool expect_success) -{ - std::cout << "[T/S] serverSocket: creating listener socket\n"; + SRTSOCKET bindsock = prepareSocket(); - SRTSOCKET bindsock = prepareSocket(); + if (!bindListener(bindsock, ip, port, expect_success)) + return SRT_INVALID_SOCK; - if (!bindListener(bindsock, ip, port, expect_success)) - return SRT_INVALID_SOCK; + return bindsock; + } - return bindsock; -} + SRTSOCKET createBinder(std::string ip, int port, bool expect_success) + { + std::cout << "[T/S] serverSocket: creating binder socket\n"; -SRTSOCKET createBinder(std::string ip, int port, bool expect_success) -{ - std::cout << "[T/S] serverSocket: creating binder socket\n"; + SRTSOCKET bindsock = prepareSocket(); - SRTSOCKET bindsock = prepareSocket(); + if (!bindSocket(bindsock, ip, port, expect_success)) + { + srt_close(bindsock); + return SRT_INVALID_SOCK; + } - if (!bindSocket(bindsock, ip, port, expect_success)) - { - srt_close(bindsock); - return SRT_INVALID_SOCK; + return bindsock; } - return bindsock; -} + void testAccept(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) + { -void testAccept(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) -{ + auto run = [this, ip, port, expect_success]() { clientSocket(ip, port, expect_success); }; - auto run = [ip, port, expect_success]() { clientSocket(ip, port, expect_success); }; + auto launched = std::async(std::launch::async, run); - auto launched = std::async(std::launch::async, run); + AtReturnJoin atreturn_join {launched}; - AtReturnJoin atreturn_join {launched}; + { // wait for connection from client + int rlen = 2; + SRTSOCKET read[2]; - { // wait for connection from client - int rlen = 2; - SRTSOCKET read[2]; + int wlen = 2; + SRTSOCKET write[2]; - int wlen = 2; - SRTSOCKET write[2]; + std::cout << "[T/S] Wait 10s for acceptance on @" << bindsock << " ...\n"; - std::cout << "[T/S] Wait 10s for acceptance on @" << bindsock << " ...\n"; + ASSERT_NE(srt_epoll_wait(m_server_pollid, + read, &rlen, + write, &wlen, + 10000, // -1 is set for debuging purpose. + // in case of production we need to set appropriate value + 0, 0, 0, 0), SRT_ERROR ); - ASSERT_NE(srt_epoll_wait(server_pollid, - read, &rlen, - write, &wlen, - 10000, // -1 is set for debuging purpose. - // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR ); + ASSERT_EQ(rlen, 1); // get exactly one read event without writes + ASSERT_EQ(wlen, 0); // get exactly one read event without writes + ASSERT_EQ(read[0], bindsock); // read event is for bind socket + } - ASSERT_EQ(rlen, 1); // get exactly one read event without writes - ASSERT_EQ(wlen, 0); // get exactly one read event without writes - ASSERT_EQ(read[0], bindsock); // read event is for bind socket - } + { + sockaddr_any scl; + srt::UniqueSocket accepted_sock = srt_accept(bindsock, scl.get(), &scl.len); + if (accepted_sock == -1) + { + std::cout << "srt_accept: " << srt_getlasterror_str() << std::endl; + } + ASSERT_NE(accepted_sock.ref(), SRT_INVALID_SOCK); - sockaddr_any scl; + sockaddr_any showacp = (sockaddr*)&scl; + std::cout << "[T/S] Accepted from: " << showacp.str() << std::endl; - SRTSOCKET accepted_sock = srt_accept(bindsock, scl.get(), &scl.len); - if (accepted_sock == -1) - { - std::cout << "srt_accept: " << srt_getlasterror_str() << std::endl; - } - ASSERT_NE(accepted_sock, SRT_INVALID_SOCK); + int epoll_in = SRT_EPOLL_IN; + srt_epoll_add_usock(m_server_pollid, accepted_sock, &epoll_in); // wait for input - sockaddr_any showacp = (sockaddr*)&scl; - std::cout << "[T/S] Accepted from: " << showacp.str() << std::endl; + char buffer[1316]; + { // wait for 1316 packet from client + int rlen = 2; + SRTSOCKET read[2]; - int epoll_in = SRT_EPOLL_IN; - srt_epoll_add_usock(server_pollid, accepted_sock, &epoll_in); // wait for input + int wlen = 2; + SRTSOCKET write[2]; - char buffer[1316]; - { // wait for 1316 packet from client - int rlen = 2; - SRTSOCKET read[2]; + std::cout << "[T/S] Wait for data reception...\n"; - int wlen = 2; - SRTSOCKET write[2]; + ASSERT_NE(srt_epoll_wait(m_server_pollid, + read, &rlen, + write, &wlen, + -1, // -1 is set for debuging purpose. + // in case of production we need to set appropriate value + 0, 0, 0, 0), SRT_ERROR ); - std::cout << "[T/S] Wait for data reception...\n"; - ASSERT_NE(srt_epoll_wait(server_pollid, - read, &rlen, - write, &wlen, - -1, // -1 is set for debuging purpose. - // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR ); + ASSERT_EQ(rlen, 1); // get exactly one read event without writes + ASSERT_EQ(wlen, 0); // get exactly one read event without writes + ASSERT_EQ(read[0], accepted_sock.ref()); // read event is for bind socket + } + char pattern[4] = {1, 2, 3, 4}; - ASSERT_EQ(rlen, 1); // get exactly one read event without writes - ASSERT_EQ(wlen, 0); // get exactly one read event without writes - ASSERT_EQ(read[0], accepted_sock); // read event is for bind socket - } + ASSERT_EQ(srt_recvmsg(accepted_sock, buffer, sizeof buffer), + 1316); - char pattern[4] = {1, 2, 3, 4}; + EXPECT_EQ(memcmp(pattern, buffer, sizeof pattern), 0); - ASSERT_EQ(srt_recvmsg(accepted_sock, buffer, sizeof buffer), - 1316); + std::cout << "[T/S] closing sockets: ACP:@" << accepted_sock << " LSN:@" << bindsock << "...\n"; + } + // client_sock closed through UniqueSocket. + // cannot close client_sock after srt_sendmsg because of issue in api.c:2346 - EXPECT_EQ(memcmp(pattern, buffer, sizeof pattern), 0); + std::cout << "[T/S] joining client async (should close client socket)...\n"; + launched.get(); + } - std::cout << "[T/S] closing sockets: ACP:@" << accepted_sock << " LSN:@" << bindsock << " CLR:@" << g_client_sock << " ...\n"; - ASSERT_NE(srt_close(accepted_sock), SRT_ERROR); - ASSERT_NE(srt_close(g_client_sock), SRT_ERROR); // cannot close g_client_sock after srt_sendmsg because of issue in api.c:2346 + static void shutdownListener(SRTSOCKET bindsock) + { + // Silently ignore. Usually it should have been checked earlier, + // and an invalid sock might be expected in particular tests. + if (bindsock == SRT_INVALID_SOCK) + return; - std::cout << "[T/S] joining client async...\n"; - launched.get(); -} + int yes = 1; + EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &yes, sizeof yes), SRT_ERROR); // for async connect + EXPECT_NE(srt_close(bindsock), SRT_ERROR); -void shutdownListener(SRTSOCKET bindsock) -{ - // Silently ignore. Usually it should have been checked earlier, - // and an invalid sock might be expected in particular tests. - if (bindsock == SRT_INVALID_SOCK) - return; + std::chrono::milliseconds check_period (100); + int credit = 400; // 10 seconds + auto then = std::chrono::steady_clock::now(); - int yes = 1; - EXPECT_NE(srt_setsockopt(bindsock, 0, SRTO_RCVSYN, &yes, sizeof yes), SRT_ERROR); // for async connect - EXPECT_NE(srt_close(bindsock), SRT_ERROR); + std::cout << "[T/S] waiting for cleanup of @" << bindsock << " up to 10s" << std::endl; + while (srt_getsockstate(bindsock) != SRTS_NONEXIST) + { + std::this_thread::sleep_for(check_period); + --credit; + if (!credit) + break; + } + auto now = std::chrono::steady_clock::now(); + auto dur = std::chrono::duration_cast(now - then); - std::chrono::milliseconds check_period (100); - int credit = 400; // 10 seconds - auto then = std::chrono::steady_clock::now(); + // Keep as single string because this tends to be mixed from 2 threads. + std::ostringstream sout; + sout << "[T/S] @" << bindsock << " dissolved after " + << (dur.count() / 1000.0) << "s" << std::endl; + std::cout << sout.str() << std::flush; - std::cout << "[T/S] waiting for cleanup of @" << bindsock << " up to 10s" << std::endl; - while (srt_getsockstate(bindsock) != SRTS_NONEXIST) - { - std::this_thread::sleep_for(check_period); - --credit; - if (!credit) - break; + EXPECT_NE(credit, 0); } - auto now = std::chrono::steady_clock::now(); - auto dur = std::chrono::duration_cast(now - then); - // Keep as single string because this tends to be mixed from 2 threads. - std::ostringstream sout; - sout << "[T/S] @" << bindsock << " dissolved after " - << (dur.count() / 1000.0) << "s" << std::endl; - std::cout << sout.str() << std::flush; +private: - EXPECT_NE(credit, 0); -} + void setup() + { + m_client_pollid = srt_epoll_create(); + ASSERT_NE(SRT_ERROR, m_client_pollid); -TEST(ReuseAddr, SameAddr1) -{ - srt::TestInit srtinit; + m_server_pollid = srt_epoll_create(); + ASSERT_NE(SRT_ERROR, m_server_pollid); + } - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); + void teardown() + { + (void)srt_epoll_release(m_client_pollid); + (void)srt_epoll_release(m_server_pollid); + } +}; - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); +TEST_F(ReuseAddr, SameAddr1) +{ SRTSOCKET bindsock_1 = createBinder("127.0.0.1", 5000, true); SRTSOCKET bindsock_2 = createListener("127.0.0.1", 5000, true); @@ -413,24 +428,14 @@ TEST(ReuseAddr, SameAddr1) s1.join(); s2.join(); - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, SameAddr2) +TEST_F(ReuseAddr, SameAddr2) { - srt::TestInit srtinit; std::string localip = GetLocalIP(AF_INET); if (localip == "") return; // DISABLE TEST if this doesn't work. - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createBinder(localip, 5000, true); SRTSOCKET bindsock_2 = createListener(localip, 5000, true); @@ -445,21 +450,11 @@ TEST(ReuseAddr, SameAddr2) testAccept(bindsock_3, localip, 5000, true); shutdownListener(bindsock_3); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, SameAddrV6) +TEST_F(ReuseAddr, SameAddrV6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); SRTSOCKET bindsock_1 = createBinder("::1", 5000, true); SRTSOCKET bindsock_2 = createListener("::1", 5000, true); @@ -475,26 +470,15 @@ TEST(ReuseAddr, SameAddrV6) testAccept(bindsock_3, "::1", 5000, true); shutdownListener(bindsock_3); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, DiffAddr) +TEST_F(ReuseAddr, DiffAddr) { - srt::TestInit srtinit; std::string localip = GetLocalIP(AF_INET); if (localip == "") return; // DISABLE TEST if this doesn't work. - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createBinder("127.0.0.1", 5000, true); SRTSOCKET bindsock_2 = createListener(localip, 5000, true); @@ -505,14 +489,10 @@ TEST(ReuseAddr, DiffAddr) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, Wildcard) +TEST_F(ReuseAddr, Wildcard) { - srt::TestInit srtinit; #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; @@ -524,14 +504,6 @@ TEST(ReuseAddr, Wildcard) std::string localip = GetLocalIP(AF_INET); if (localip == "") return; // DISABLE TEST if this doesn't work. - - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createListener("0.0.0.0", 5000, true); // Binding a certain address when wildcard is already bound should fail. @@ -541,15 +513,11 @@ TEST(ReuseAddr, Wildcard) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, Wildcard6) +TEST_F(ReuseAddr, Wildcard6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; @@ -567,13 +535,6 @@ TEST(ReuseAddr, Wildcard6) // performed there. std::string localip_v4 = GetLocalIP(AF_INET); - - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - // This must be obligatory set before binding a socket to "::" int strict_ipv6 = 1; @@ -620,28 +581,18 @@ TEST(ReuseAddr, Wildcard6) shutdownListener(bindsock_1); shutdownListener(bindsock_2); shutdownListener(bindsock_3); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, ProtocolVersion6) +TEST_F(ReuseAddr, ProtocolVersion6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; return; #endif - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createListener("0.0.0.0", 5000, true); // We need a small interception in this one. @@ -659,27 +610,18 @@ TEST(ReuseAddr, ProtocolVersion6) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } -TEST(ReuseAddr, ProtocolVersionFaux6) +TEST_F(ReuseAddr, ProtocolVersionFaux6) { SRTST_REQUIRES(IPv6); - srt::TestInit srtinit; + #if defined(_WIN32) || defined(CYGWIN) std::cout << "!!!WARNING!!!: On Windows connection to localhost this way isn't possible.\n" "Forcing test to pass, PLEASE FIX.\n"; return; #endif - client_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, client_pollid); - - server_pollid = srt_epoll_create(); - ASSERT_NE(SRT_ERROR, server_pollid); - SRTSOCKET bindsock_1 = createListener("0.0.0.0", 5000, true); // We need a small interception in this one. @@ -696,7 +638,4 @@ TEST(ReuseAddr, ProtocolVersionFaux6) shutdownListener(bindsock_1); shutdownListener(bindsock_2); - - (void)srt_epoll_release(client_pollid); - (void)srt_epoll_release(server_pollid); } From 74592c11ffa632c72a4e5a55086e1f4e512bf591 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Tue, 27 Feb 2024 16:42:35 +0100 Subject: [PATCH 03/16] Fixed more tests --- test/test_file_transmission.cpp | 16 ++++-- test/test_reuseaddr.cpp | 92 +++++++++++++++++++++++---------- test/test_sync.cpp | 3 +- 3 files changed, 78 insertions(+), 33 deletions(-) diff --git a/test/test_file_transmission.cpp b/test/test_file_transmission.cpp index a538683dd..b5da38636 100644 --- a/test/test_file_transmission.cpp +++ b/test/test_file_transmission.cpp @@ -25,6 +25,7 @@ #include #include #include +#include //#pragma comment (lib, "ws2_32.lib") @@ -94,7 +95,7 @@ TEST(Transmission, FileUpload) // Start listener-receiver thread - bool thread_exit = false; + std::atomic thread_exit { false }; auto client = std::thread([&] { @@ -117,12 +118,17 @@ TEST(Transmission, FileUpload) for (;;) { int n = srt_recv(accepted_sock, buf.data(), 1456); - ASSERT_NE(n, SRT_ERROR); + EXPECT_NE(n, SRT_ERROR) << srt_getlasterror_str(); if (n == 0) { std::cerr << "Received 0 bytes, breaking.\n"; break; } + else if (n == -1) + { + std::cerr << "READ FAILED, breaking anyway\n"; + break; + } // Write to file any amount of data received copyfile.write(buf.data(), n); @@ -183,8 +189,10 @@ TEST(Transmission, FileUpload) std::cout << "Comparing files\n"; // Compare files - tarfile.seekg(0, std::ios::end); - ifile.seekg(0, std::ios::beg); + tarfile.seekg(0, std::ios::beg); + + ifile.close(); + ifile.open("file.source", std::ios::in | std::ios::binary); for (size_t i = 0; i < tar_size; ++i) { diff --git a/test/test_reuseaddr.cpp b/test/test_reuseaddr.cpp index 8779a3a79..54499ba79 100644 --- a/test/test_reuseaddr.cpp +++ b/test/test_reuseaddr.cpp @@ -1,5 +1,6 @@ #include #include +#include #ifndef _WIN32 #include #endif @@ -103,11 +104,45 @@ static std::string GetLocalIP(int af = AF_UNSPEC) class ReuseAddr : public srt::Test { - int m_client_pollid = SRT_ERROR; int m_server_pollid = SRT_ERROR; - protected: + + std::string showEpollContents(const char* label, int* array, int length) + { + std::ostringstream out; + out << label << ":["; + if (length) + { + // Now is at least 1 + out << "@" << array[0]; + + for (int i = 1; i < length; ++i) + out << " @" << array[i]; + } + out << "]"; + return out.str(); + } + + struct UniquePollid + { + int pollid = SRT_ERROR; + UniquePollid() + { + pollid = srt_epoll_create(); + } + + ~UniquePollid() + { + srt_epoll_release(pollid); + } + + operator int() const + { + return pollid; + } + }; + void clientSocket(SRTSOCKET client_sock, std::string ip, int port, bool expect_success) { using namespace std; @@ -132,8 +167,11 @@ class ReuseAddr : public srt::Test EXPECT_NE(srt_setsockflag(client_sock, SRTO_SENDER, &yes, sizeof yes), SRT_ERROR); EXPECT_NE(srt_setsockflag(client_sock, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); + UniquePollid client_pollid; + ASSERT_NE(int(client_pollid), SRT_ERROR); + int epoll_out = SRT_EPOLL_OUT; - srt_epoll_add_usock(m_client_pollid, client_sock, &epoll_out); + srt_epoll_add_usock(client_pollid, client_sock, &epoll_out); sockaddr_any sa = srt::CreateAddr(ip, port, family); @@ -164,14 +202,14 @@ class ReuseAddr : public srt::Test cout << "[T/C] Waiting for connection readiness...\n"; - EXPECT_NE(srt_epoll_wait(m_client_pollid, read, &rlen, + EXPECT_NE(srt_epoll_wait(client_pollid, read, &rlen, write, &wlen, -1, // -1 is set for debuging purpose. // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR); + 0, 0, 0, 0), SRT_ERROR) << srt_getlasterror_str(); - EXPECT_EQ(rlen, 0) << "[T/C] read-ready"; // get exactly one write event without reads - EXPECT_EQ(wlen, 1) << "[T/C] write-ready"; // get exactly one write event without reads + EXPECT_EQ(rlen, 0) << showEpollContents("[T/C] R", read, rlen); // get exactly one write event without reads + EXPECT_EQ(wlen, 1) << showEpollContents("[T/C] W", write, wlen); // get exactly one write event without reads EXPECT_EQ(write[0], client_sock); // for our client socket char buffer[1316] = {1, 2, 3, 4}; @@ -194,7 +232,7 @@ class ReuseAddr : public srt::Test cout << "[T/C] Client exit\n"; } - SRTSOCKET prepareSocket() + SRTSOCKET prepareServerSocket() { SRTSOCKET bindsock = srt_create_socket(); EXPECT_NE(bindsock, SRT_ERROR); @@ -251,7 +289,7 @@ class ReuseAddr : public srt::Test { std::cout << "[T/S] serverSocket: creating listener socket\n"; - SRTSOCKET bindsock = prepareSocket(); + SRTSOCKET bindsock = prepareServerSocket(); if (!bindListener(bindsock, ip, port, expect_success)) return SRT_INVALID_SOCK; @@ -263,7 +301,7 @@ class ReuseAddr : public srt::Test { std::cout << "[T/S] serverSocket: creating binder socket\n"; - SRTSOCKET bindsock = prepareSocket(); + SRTSOCKET bindsock = prepareServerSocket(); if (!bindSocket(bindsock, ip, port, expect_success)) { @@ -291,18 +329,18 @@ class ReuseAddr : public srt::Test int wlen = 2; SRTSOCKET write[2]; - std::cout << "[T/S] Wait 10s for acceptance on @" << bindsock << " ...\n"; + std::cout << "[T/S] Wait 10s on E" << m_server_pollid << " for acceptance on @" << bindsock << " ...\n"; EXPECT_NE(srt_epoll_wait(m_server_pollid, read, &rlen, write, &wlen, 10000, // -1 is set for debuging purpose. // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR ); + 0, 0, 0, 0), SRT_ERROR) << srt_getlasterror_str(); - EXPECT_EQ(rlen, 1); // get exactly one read event without writes - EXPECT_EQ(wlen, 0); // get exactly one read event without writes + EXPECT_EQ(rlen, 1) << showEpollContents("[T/S] R", read, rlen); // get exactly one read event without writes + EXPECT_EQ(wlen, 0) << showEpollContents("[T/S] W", write, wlen); // get exactly one read event without writes ASSERT_EQ(read[0], bindsock); // read event is for bind socket } @@ -336,11 +374,11 @@ class ReuseAddr : public srt::Test write, &wlen, -1, // -1 is set for debuging purpose. // in case of production we need to set appropriate value - 0, 0, 0, 0), SRT_ERROR ); + 0, 0, 0, 0), SRT_ERROR) << srt_getlasterror_str(); - EXPECT_EQ(rlen, 1) << "[T/S] read-ready"; // get exactly one read event without writes - EXPECT_EQ(wlen, 0) << "[T/S] write-ready"; // get exactly one read event without writes + EXPECT_EQ(rlen, 1) << showEpollContents("[T/S] R", read, rlen); // get exactly one read event without writes + EXPECT_EQ(wlen, 0) << showEpollContents("[T/S] W", write, wlen); // get exactly one read event without writes EXPECT_EQ(read[0], accepted_sock.ref()); // read event is for bind socket } @@ -356,8 +394,9 @@ class ReuseAddr : public srt::Test // client_sock closed through UniqueSocket. // cannot close client_sock after srt_sendmsg because of issue in api.c:2346 - std::cout << "[T/S] joining client async (should close client socket)...\n"; + std::cout << "[T/S] joining client async \n"; launched.get(); + std::cout << "[T/S] closing client socket\n"; } static void shutdownListener(SRTSOCKET bindsock) @@ -399,17 +438,14 @@ class ReuseAddr : public srt::Test void setup() { - m_client_pollid = srt_epoll_create(); - ASSERT_NE(m_client_pollid, SRT_ERROR); - m_server_pollid = srt_epoll_create(); ASSERT_NE(m_server_pollid, SRT_ERROR); } void teardown() { - (void)srt_epoll_release(m_client_pollid); (void)srt_epoll_release(m_server_pollid); + m_server_pollid = SRT_ERROR; } }; @@ -537,7 +573,7 @@ TEST_F(ReuseAddr, Wildcard6) // This must be obligatory set before binding a socket to "::" int strict_ipv6 = 1; - SRTSOCKET bindsock_1 = prepareSocket(); + SRTSOCKET bindsock_1 = prepareServerSocket(); srt_setsockflag(bindsock_1, SRTO_IPV6ONLY, &strict_ipv6, sizeof strict_ipv6); bindListener(bindsock_1, "::", 5000, true); @@ -561,7 +597,7 @@ TEST_F(ReuseAddr, Wildcard6) strict_ipv6 = 0; - bindsock_1 = prepareSocket(); + bindsock_1 = prepareServerSocket(); srt_setsockflag(bindsock_1, SRTO_IPV6ONLY, &strict_ipv6, sizeof strict_ipv6); bindListener(bindsock_1, "::", 5000, true); @@ -595,8 +631,8 @@ TEST_F(ReuseAddr, ProtocolVersion6) SRTSOCKET bindsock_1 = createListener("0.0.0.0", 5000, true); // We need a small interception in this one. - // createListener = prepareSocket | bindListener - SRTSOCKET bindsock_2 = prepareSocket(); + // createListener = prepareServerSocket | bindListener + SRTSOCKET bindsock_2 = prepareServerSocket(); { int yes = 1; @@ -624,8 +660,8 @@ TEST_F(ReuseAddr, ProtocolVersionFaux6) SRTSOCKET bindsock_1 = createListener("0.0.0.0", 5000, true); // We need a small interception in this one. - // createListener = prepareSocket | bindListener - SRTSOCKET bindsock_2 = prepareSocket(); + // createListener = prepareServerSocket | bindListener + SRTSOCKET bindsock_2 = prepareServerSocket(); { int no = 0; diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 844705ea6..e0454a581 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -587,7 +587,8 @@ TEST(SyncEvent, WaitForNotifyAll) /*****************************************************************************/ void* dummythread(void* param) { - *(bool*)(param) = true; + auto& thread_finished = *(srt::sync::atomic*)param; + thread_finished = true; return nullptr; } From 2075a65e114fb848223c4e15897b29d408abb3ad Mon Sep 17 00:00:00 2001 From: Sektor van Skijlen Date: Tue, 27 Feb 2024 16:45:38 +0100 Subject: [PATCH 04/16] Fixed tab indent in CMakeLists --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0bcc62a86..8951dc46c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -73,7 +73,7 @@ if (NOT MICROSOFT) # that ENABLE_DEBUG is set as it should. if (ENABLE_DEBUG EQUAL 2) set (CMAKE_BUILD_TYPE "RelWithDebInfo") - if (ENABLE_ASSERT) + if (ENABLE_ASSERT) # Add _DEBUG macro if explicitly requested, to enable SRT_ASSERT(). add_definitions(-D_DEBUG) else() From a82fd26a52d79b9b5a3f40f88320315ea6499a4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 28 Feb 2024 10:23:43 +0100 Subject: [PATCH 05/16] Added extra fixes for data races --- srtcore/api.cpp | 53 +++++++++++++++++++---------------------------- srtcore/core.cpp | 13 ++++++++---- srtcore/queue.cpp | 7 +++---- 3 files changed, 33 insertions(+), 40 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 5213d174f..f0952a8da 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1905,7 +1905,7 @@ int srt::CUDTUnited::close(const SRTSOCKET u) return 0; } #endif - +#if ENABLE_HEAVY_LOGGING // Wrapping the log into a destructor so that it // is printed AFTER the destructor of SocketKeeper. struct ForceDestructor @@ -1916,14 +1916,15 @@ int srt::CUDTUnited::close(const SRTSOCKET u) { if (ps) // Could be not acquired by SocketKeeper, occasionally { - LOGC(smlog.Note, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy()); + HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy()); } } } fod; +#endif SocketKeeper k(*this, u, ERH_THROW); - fod.ps = k.socket; - LOGC(smlog.Note, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy()); + IF_HEAVY_LOGGING(fod.ps = k.socket); + HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy()); int ret = close(k.socket); return ret; @@ -2573,37 +2574,25 @@ srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling bool srt::CUDTUnited::acquireSocket(CUDTSocket* s) { - /* + // Note that before using this function you must be certain + // that the socket isn't broken already and it still has at least + // one more GC cycle to live. In other words, you must be certain + // that this pointer passed here isn't dangling and was obtained + // directly from m_Sockets, or even better, has been acquired + // by some other functionality already, which is only about to + // be released earlier than you need. ScopedLock cg(m_GlobControlLock); - - // JUST IN CASE, try to find the socket in m_Sockets (NOT in m_ClosedSockets). - // If it's not there, just pretend it's already deleted. The m_ClosedSockets - // is a container that keeps sockets that are being still in use of other threads, - // but were dispatched or acquired before another activity has requested them - // to be deleted. - - // This uses simply a pointer value, so it should be safe even if the pointer - // was dangling. We state it's virtually impossible for a non-paused thread - // to get in a situation of having the reclaimed memory after a socket removed - // from m_Sockets to be reassigned to another new socket. Note that the socket - // being normally passed here as argument is a socket that was previously - // dispatched from somewhere else. - - for (sockets_t::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++i) + s->apiAcquire(); + // Keep the lock so that no one changes anything in the meantime. + // If the socket m_Status == SRTS_CLOSED (set by setClosed()), then + // this socket is no longer present in the m_Sockets container + if (s->m_Status >= SRTS_BROKEN) { - CUDTSocket* is = i->second; - - if (s == is) - { - */ - s->apiAcquire(); - return true; - /* - } + s->apiRelease(); + return false; } - s->apiAcquire(); - return false; - */ + + return true; } srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 280b6adb6..255fcb5c6 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -7087,7 +7087,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_ do { - if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(steady_clock::now())) + if (stillConnected() && !timeout && !isRcvBufferReady()) { /* Kick TsbPd thread to schedule next wakeup (if running) */ if (m_bTsbPd) @@ -8726,7 +8726,10 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr // srt_recvfile (which doesn't make any sense), you'll have a deadlock. if (m_config.bDriftTracer) { + enterCS(m_RcvBufferLock); const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); + leaveCS(m_RcvBufferLock); + #if ENABLE_BONDING if (drift_updated && m_parent->m_GroupOf) { @@ -9997,10 +10000,12 @@ int srt::CUDT::checkLazySpawnTsbPdThread() { const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd; - if (need_tsbpd && !m_RcvTsbPdThread.joinable()) - { - ScopedLock lock(m_RcvTsbPdStartupLock); + if (!need_tsbpd) + return 0; + ScopedLock lock(m_RcvTsbPdStartupLock); + if (!m_RcvTsbPdThread.joinable()) + { if (m_bClosing) // Check again to protect join() in CUDT::releaseSync() return -1; diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 37819de66..a7df820ec 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -936,7 +936,7 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst EReadStatus read_st = rst; EConnectStatus conn_st = cst; - /* + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id); if (!sk.socket) { @@ -945,7 +945,7 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst toRemove.push_back(*i); continue; } - // */ + if (cst != CONN_RENDEZVOUS && dest_id != 0) { @@ -993,7 +993,7 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst { HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id); remove(i->id); - /* + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id); if (!sk.socket) { @@ -1001,7 +1001,6 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, NOT ACCESSING its contents"); continue; } - // */ // Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue // because the next CUDT::close will not remove it from the queue when m_bConnecting = false, From 278fc72a0b071816928409604b32d9f9442ff828 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 28 Feb 2024 12:54:56 +0100 Subject: [PATCH 06/16] Fixed file transmission test on Windows --- test/test_file_transmission.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/test_file_transmission.cpp b/test/test_file_transmission.cpp index b5da38636..bfd668ac7 100644 --- a/test/test_file_transmission.cpp +++ b/test/test_file_transmission.cpp @@ -180,7 +180,7 @@ TEST(Transmission, FileUpload) std::cout << "Sockets closed, joining receiver thread\n"; client.join(); - std::ifstream tarfile("file.target"); + std::ifstream tarfile("file.target", std::ios::in | std::ios::binary); EXPECT_EQ(!!tarfile, true); tarfile.seekg(0, std::ios::end); @@ -189,7 +189,11 @@ TEST(Transmission, FileUpload) std::cout << "Comparing files\n"; // Compare files - tarfile.seekg(0, std::ios::beg); + + // Theoretically it should work if you just rewind to 0, but + // on Windows this somehow doesn't work. + tarfile.close(); + tarfile.open("file.target", std::ios::in | std::ios::binary); ifile.close(); ifile.open("file.source", std::ios::in | std::ios::binary); From cd5d7713b04f225543dc006579bc0de3b36e5cb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 28 Feb 2024 14:30:24 +0100 Subject: [PATCH 07/16] Fixed some build breaks --- srtcore/api.cpp | 4 ++-- srtcore/api.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index f0952a8da..0c660a3ee 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -2661,7 +2661,7 @@ void srt::CUDTUnited::checkBrokenSockets() if (s->m_Status == SRTS_LISTENING) { - const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp; + const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp.load(); // A listening socket should wait an extra 3 seconds // in case a client is connecting. if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS)) @@ -2746,7 +2746,7 @@ void srt::CUDTUnited::checkBrokenSockets() // timeout 1 second to destroy a socket AND it has been removed from // RcvUList const steady_clock::time_point now = steady_clock::now(); - const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp; + const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp.load(); if (closed_ago > seconds_from(1)) { CRNode* rnode = u.m_pRNode; diff --git a/srtcore/api.h b/srtcore/api.h index 30c67aa25..963452313 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -491,7 +491,7 @@ class CUDTUnited { if (socket) { - SRT_ASSERT(socket.m_iBusy > 0); + SRT_ASSERT(socket->isStillBusy() > 0); socket->apiRelease(); // Only now that the group lock is lifted, can the // group be now deleted and this pointer potentially dangling From 085043a3719e0251f89a33ebf0a2d28ed4d11d59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 28 Feb 2024 16:17:47 +0100 Subject: [PATCH 08/16] Armed UniqueSocket in file/line grab for creation location --- test/test_env.h | 11 +++++++++-- test/test_epoll.cpp | 22 +++++++++++----------- test/test_main.cpp | 2 +- test/test_reuseaddr.cpp | 5 +++-- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/test/test_env.h b/test/test_env.h index 68ec516b3..1db98a054 100644 --- a/test/test_env.h +++ b/test/test_env.h @@ -63,15 +63,22 @@ class TestInit class UniqueSocket { int32_t sock; + std::string lab, f; + int l; public: - UniqueSocket(int32_t s): sock(s) + UniqueSocket(int32_t s, const char* label, const char* file, int line): sock(s) { if (s == -1) throw std::invalid_argument("Invalid socket"); + lab = label; + f = file; + l = line; } - UniqueSocket(): sock(-1) +#define MAKE_UNIQUE_SOCK(name, label, expr) srt::UniqueSocket name (expr, label, __FILE__, __LINE__) + + UniqueSocket(): sock(-1), l(0) { } diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index 116978c08..a8a88aa7a 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -66,7 +66,7 @@ TEST(CEPoll, WaitEmptyCall) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int no = 0; @@ -89,7 +89,7 @@ TEST(CEPoll, UWaitEmptyCall) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int no = 0; @@ -112,7 +112,7 @@ TEST(CEPoll, WaitAllSocketsInEpollReleased) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int yes = 1; @@ -146,7 +146,7 @@ TEST(CEPoll, WaitAllSocketsInEpollReleased2) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int yes = 1; @@ -174,7 +174,7 @@ TEST(CEPoll, WrongEpoll_idOnAddUSock) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int no = 0; @@ -197,7 +197,7 @@ TEST(CEPoll, HandleEpollEvent) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); EXPECT_NE(client_sock, SRT_ERROR); const int yes = 1; @@ -258,7 +258,7 @@ TEST(CEPoll, NotifyConnectionBreak) srt::TestInit srtinit; // 1. Prepare client - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); ASSERT_NE(client_sock, SRT_ERROR); const int yes SRT_ATR_UNUSED = 1; @@ -280,7 +280,7 @@ TEST(CEPoll, NotifyConnectionBreak) ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa_client.sin_addr), 1); // 2. Prepare server - srt::UniqueSocket server_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(server_sock, "server_sock", srt_create_socket()); ASSERT_NE(server_sock, SRT_ERROR); ASSERT_NE(srt_setsockopt(server_sock, 0, SRTO_RCVSYN, &no, sizeof no), SRT_ERROR); // for async connect @@ -372,7 +372,7 @@ TEST(CEPoll, HandleEpollEvent2) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); EXPECT_NE(client_sock, SRT_ERROR); const int yes = 1; @@ -433,7 +433,7 @@ TEST(CEPoll, HandleEpollNoEvent) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); EXPECT_NE(client_sock, SRT_ERROR); const int yes = 1; @@ -483,7 +483,7 @@ TEST(CEPoll, ThreadedUpdate) { srt::TestInit srtinit; - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "client", srt_create_socket()); EXPECT_NE(client_sock, SRT_ERROR); const int no = 0; diff --git a/test/test_main.cpp b/test/test_main.cpp index cc536802f..a25c8ef38 100644 --- a/test/test_main.cpp +++ b/test/test_main.cpp @@ -183,7 +183,7 @@ sockaddr_any CreateAddr(const std::string& name, unsigned short port, int pref_f UniqueSocket::~UniqueSocket() { - EXPECT_NE(srt_close(sock), SRT_ERROR); + EXPECT_NE(srt_close(sock), SRT_ERROR) << lab << " CREATED:"<< f << ":" << l; } } diff --git a/test/test_reuseaddr.cpp b/test/test_reuseaddr.cpp index 54499ba79..087b6c6ab 100644 --- a/test/test_reuseaddr.cpp +++ b/test/test_reuseaddr.cpp @@ -314,7 +314,7 @@ class ReuseAddr : public srt::Test void testAccept(SRTSOCKET bindsock, std::string ip, int port, bool expect_success) { - srt::UniqueSocket client_sock = srt_create_socket(); + MAKE_UNIQUE_SOCK(client_sock, "[T/C]connect", srt_create_socket()); auto run = [this, &client_sock, ip, port, expect_success]() { clientSocket(client_sock, ip, port, expect_success); }; @@ -346,7 +346,8 @@ class ReuseAddr : public srt::Test { sockaddr_any scl; - srt::UniqueSocket accepted_sock = srt_accept(bindsock, scl.get(), &scl.len); + MAKE_UNIQUE_SOCK(accepted_sock, "[T/S]accept", srt_accept(bindsock, scl.get(), &scl.len)); + if (accepted_sock == -1) { std::cout << "srt_accept: " << srt_getlasterror_str() << std::endl; From 4638192e470c5a0117ecfeb65fafe7ee55bba1c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 29 Feb 2024 12:00:49 +0100 Subject: [PATCH 09/16] Still tracking close error on Ubuntu --- test/test_reuseaddr.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/test_reuseaddr.cpp b/test/test_reuseaddr.cpp index 087b6c6ab..f11e59f1a 100644 --- a/test/test_reuseaddr.cpp +++ b/test/test_reuseaddr.cpp @@ -396,7 +396,9 @@ class ReuseAddr : public srt::Test // cannot close client_sock after srt_sendmsg because of issue in api.c:2346 std::cout << "[T/S] joining client async \n"; + EXPECT_EQ(srt_getsockstate(client_sock), SRTS_CONNECTED); launched.get(); + EXPECT_EQ(srt_getsockstate(client_sock), SRTS_CONNECTED); std::cout << "[T/S] closing client socket\n"; } From ca98ee3e477f860f83a1024fc97cbd522f93904d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 29 Feb 2024 12:35:18 +0100 Subject: [PATCH 10/16] Added explicit closing of client socket to avoid auto-close-broken problem --- test/test_env.h | 1 + test/test_main.cpp | 9 ++++++++- test/test_reuseaddr.cpp | 14 ++++++++++---- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/test/test_env.h b/test/test_env.h index 1db98a054..e20905351 100644 --- a/test/test_env.h +++ b/test/test_env.h @@ -82,6 +82,7 @@ class UniqueSocket { } + void close(); ~UniqueSocket(); operator int32_t() const diff --git a/test/test_main.cpp b/test/test_main.cpp index a25c8ef38..cb3b7c5c5 100644 --- a/test/test_main.cpp +++ b/test/test_main.cpp @@ -183,7 +183,14 @@ sockaddr_any CreateAddr(const std::string& name, unsigned short port, int pref_f UniqueSocket::~UniqueSocket() { - EXPECT_NE(srt_close(sock), SRT_ERROR) << lab << " CREATED:"<< f << ":" << l; + // Could be closed explicitly + if (sock != -1) + close(); +} + +void UniqueSocket::close() +{ + EXPECT_NE(srt_close(sock), SRT_ERROR) << lab << " CREATED: "<< f << ":" << l; } } diff --git a/test/test_reuseaddr.cpp b/test/test_reuseaddr.cpp index f11e59f1a..fa92bc635 100644 --- a/test/test_reuseaddr.cpp +++ b/test/test_reuseaddr.cpp @@ -390,16 +390,22 @@ class ReuseAddr : public srt::Test EXPECT_EQ(memcmp(pattern, buffer, sizeof pattern), 0); - std::cout << "[T/S] closing sockets: ACP:@" << accepted_sock << " LSN:@" << bindsock << "...\n"; + // XXX There is a possibility that a broken socket can be closed automatically, + // just the srt_close() call would simply return error in case of nonexistent + // socket. Therefore close them both at once; this problem needs to be fixed + // separately. + // + // The test only intends to send one portion of data from the client, so once + // received, the client has nothing more to do and should exit. + std::cout << "[T/S] closing client socket\n"; + client_sock.close(); + std::cout << "[T/S] closing sockets: ACP:@" << accepted_sock << "...\n"; } // client_sock closed through UniqueSocket. // cannot close client_sock after srt_sendmsg because of issue in api.c:2346 std::cout << "[T/S] joining client async \n"; - EXPECT_EQ(srt_getsockstate(client_sock), SRTS_CONNECTED); launched.get(); - EXPECT_EQ(srt_getsockstate(client_sock), SRTS_CONNECTED); - std::cout << "[T/S] closing client socket\n"; } static void shutdownListener(SRTSOCKET bindsock) From 11dd050715fbdcd060caf86c1d2bd6c9ee87d716 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 18 Jul 2024 16:57:18 +0200 Subject: [PATCH 11/16] Applied fixes from code review --- srtcore/api.cpp | 15 +++++++-------- srtcore/api.h | 6 +++--- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index ea7451f00..33da574de 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1916,26 +1916,25 @@ int srt::CUDTUnited::close(const SRTSOCKET u) #if ENABLE_HEAVY_LOGGING // Wrapping the log into a destructor so that it // is printed AFTER the destructor of SocketKeeper. - struct ForceDestructor + struct ScopedExitLog { - CUDTSocket* ps; - ForceDestructor(): ps(NULL){} - ~ForceDestructor() + const CUDTSocket* const ps; + ScopedExitLog(const CUDTSocket* p): ps(p){} + ~ScopedExitLog() { if (ps) // Could be not acquired by SocketKeeper, occasionally { HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy()); } } - } fod; + }; #endif SocketKeeper k(*this, u, ERH_THROW); - IF_HEAVY_LOGGING(fod.ps = k.socket); + IF_HEAVY_LOGGING(ScopedExitLog slog(k.socket)); HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy()); - int ret = close(k.socket); - return ret; + return close(k.socket); } #if ENABLE_BONDING diff --git a/srtcore/api.h b/srtcore/api.h index bac6ae026..6addfbfa2 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -337,7 +337,7 @@ class CUDTUnited int epoll_release(const int eid); #if ENABLE_BONDING - // [[using locked(m_GlobControlLock)]] + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock) CUDTGroup& addGroup(SRTSOCKET id, SRT_GROUP_TYPE type) { // This only ensures that the element exists. @@ -359,7 +359,7 @@ class CUDTUnited void deleteGroup(CUDTGroup* g); void deleteGroup_LOCKED(CUDTGroup* g); - // [[using locked(m_GlobControlLock)]] + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock) CUDTGroup* findPeerGroup_LOCKED(SRTSOCKET peergroup) { for (groups_t::iterator i = m_Groups.begin(); i != m_Groups.end(); ++i) @@ -485,7 +485,7 @@ class CUDTUnited // This is only for a use together with an empty constructor. bool acquire(CUDTUnited& glob, CUDTSocket* s) { - bool caught = glob.acquireSocket(s); + const bool caught = glob.acquireSocket(s); socket = caught ? s : NULL; return caught; } From 08a401037ebd8a5d063933fc14ef22cee26e8a70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 18 Jul 2024 17:02:52 +0200 Subject: [PATCH 12/16] Fixed constness of isStillBusy --- srtcore/api.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srtcore/api.h b/srtcore/api.h index 6addfbfa2..b16784bbd 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -129,7 +129,7 @@ class CUDTSocket void apiAcquire() { ++m_iBusy; } void apiRelease() { --m_iBusy; } - int isStillBusy() + int isStillBusy() const { return m_iBusy; } From 6e2496aee9b05827afcfb1159932308d7a766ac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 18 Jul 2024 17:25:16 +0200 Subject: [PATCH 13/16] Updated failing test to track the failure cause --- test/test_fec_rebuilding.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_fec_rebuilding.cpp b/test/test_fec_rebuilding.cpp index 185b54e98..ac57e5265 100644 --- a/test/test_fec_rebuilding.cpp +++ b/test/test_fec_rebuilding.cpp @@ -544,7 +544,7 @@ TEST(TestFEC, ConnectionMess) SRTSOCKET la[] = { l }; SRTSOCKET a = srt_accept_bond(la, 1, 2000); - ASSERT_NE(a, SRT_ERROR); + ASSERT_NE(a, SRT_ERROR) << srt_getlasterror_str(); EXPECT_EQ(connect_res.get(), SRT_SUCCESS); // Now that the connection is established, check negotiated config From 326d324eb5d65196449065a2ab49bf9680684fcd Mon Sep 17 00:00:00 2001 From: Mikolaj Malecki Date: Thu, 25 Jul 2024 17:09:51 +0200 Subject: [PATCH 14/16] Some sanitizer and warning fixes --- srtcore/core.cpp | 2 ++ srtcore/logging.h | 8 +++++--- srtcore/packet.h | 5 +++++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 97736c3c4..d5e29bea2 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -5844,6 +5844,7 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& } #if ENABLE_BONDING + m_ConnectionLock.unlock(); // The socket and the group are only linked to each other after interpretSrtHandshake(..) has been called. // Keep the group alive for the lifetime of this function, // and do it BEFORE acquiring m_ConnectionLock to avoid @@ -5851,6 +5852,7 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& // This will check if a socket belongs to a group and if so // it will remember this group and keep it alive here. CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent); + m_ConnectionLock.lock(); #endif if (!prepareBuffers(NULL)) diff --git a/srtcore/logging.h b/srtcore/logging.h index 2ec5f46aa..3f4efb286 100644 --- a/srtcore/logging.h +++ b/srtcore/logging.h @@ -112,7 +112,7 @@ struct LogConfig std::ostream* log_stream; SRT_LOG_HANDLER_FN* loghandler_fn; void* loghandler_opaque; - srt::sync::Mutex mutex; + mutable srt::sync::Mutex mutex; int flags; LogConfig(const fa_bitset_t& efa, @@ -132,10 +132,10 @@ struct LogConfig } SRT_ATTR_ACQUIRE(mutex) - void lock() { mutex.lock(); } + void lock() const { mutex.lock(); } SRT_ATTR_RELEASE(mutex) - void unlock() { mutex.unlock(); } + void unlock() const { mutex.unlock(); } }; // The LogDispatcher class represents the object that is responsible for @@ -424,8 +424,10 @@ inline bool LogDispatcher::CheckEnabled() // when the enabler check is tested here. Worst case, the log // will be printed just a moment after it was turned off. const LogConfig* config = src_config; // to enforce using const operator[] + config->lock(); int configured_enabled_fa = config->enabled_fa[fa]; int configured_maxlevel = config->max_level; + config->unlock(); return configured_enabled_fa && level <= configured_maxlevel; } diff --git a/srtcore/packet.h b/srtcore/packet.h index 5094247b5..866428f3f 100644 --- a/srtcore/packet.h +++ b/srtcore/packet.h @@ -74,6 +74,11 @@ class IOVector #endif { public: + IOVector() + { + set(0, 0); + } + inline void set(void* buffer, size_t length) { #ifdef _WIN32 From 97b26d9b365ba45826e281771d22b9e984fd42ec Mon Sep 17 00:00:00 2001 From: Mikolaj Malecki Date: Thu, 15 Aug 2024 11:14:18 +0200 Subject: [PATCH 15/16] Added guarding the socket for the whole length of a packet dispatching function --- srtcore/queue.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index fa2797721..6cb4faeb1 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -1472,6 +1472,12 @@ srt::EConnectStatus srt::CRcvQueue::worker_ProcessAddressedPacket(int32_t id, CU HLOGC(cnlog.Debug, log << "worker_ProcessAddressedPacket: resending to QUEUED socket @" << id); return worker_TryAsyncRend_OrStore(id, unit, addr); } + // Although we donĀ“t have an exclusive passing here, + // we can count on that when the socket was once present in the hash, + // it will not be deleted for at least one GC cycle. But we still need + // to maintain the object existence until it's in use. + // Note that here we are out of any locks, so m_GlobControlLock can be locked. + CUDTUnited::SocketKeeper sk (CUDT::uglobal(), u->m_parent); // Found associated CUDT - process this as control or data packet // addressed to an associated socket. From abb99dc7de3a8c733c96aa640fdb90518c16b8af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Mon, 19 Aug 2024 15:18:37 +0200 Subject: [PATCH 16/16] Fixed per code review --- srtcore/api.cpp | 7 +++++++ srtcore/core.cpp | 3 --- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index ac93a6f9c..bb5dd64fe 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -2729,6 +2729,13 @@ void srt::CUDTUnited::checkBrokenSockets() { CUDTSocket* ps = j->second; + // NOTE: There is still a hypothetical risk here that ps + // was made busy while the socket was already moved to m_ClosedSocket, + // if the socket was acquired through CUDTUnited::acquireSocket (that is, + // busy flag acquisition was done through the CUDTSocket* pointer rather + // than through the numeric ID). Therefore this way of busy acquisition + // should be done only if at the moment of acquisition there are certainly + // other conditions applying on the socket that prevent it from being deleted. if (ps->isStillBusy()) { HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << ps->m_SocketID << " is still busy, SKIPPING THIS CYCLE."); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 2478f1770..d1882b506 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -8749,14 +8749,11 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr // srt_recvfile (which doesn't make any sense), you'll have a deadlock. if (m_config.bDriftTracer) { - //enterCS(m_RcvBufferLock); - #if ENABLE_BONDING ScopedLock glock(uglobal().m_GlobControlLock); // XXX not too excessive? const bool drift_updated = #endif m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); - //leaveCS(m_RcvBufferLock); #if ENABLE_BONDING if (drift_updated && m_parent->m_GroupOf)