From 88cf550bc3803e7c988ff7dacdc59322bc02c6f0 Mon Sep 17 00:00:00 2001 From: ruanshudong Date: Thu, 12 Oct 2023 09:30:37 +0800 Subject: [PATCH] fix epoll server queue mode in co mode --- unit-test/util/test_server.h | 3 +- unit-test/util/test_tc_epoller_server.cpp | 12 +++- util/include/util/tc_epoll_server.h | 10 ++- util/src/tc_epoll_server.cpp | 88 ++++++++++++++++------- 4 files changed, 83 insertions(+), 30 deletions(-) diff --git a/unit-test/util/test_server.h b/unit-test/util/test_server.h index 356e49a9..109f8049 100644 --- a/unit-test/util/test_server.h +++ b/unit-test/util/test_server.h @@ -120,6 +120,7 @@ class TcpQueueHandle : public TC_EpollServer::Handle shared_ptr send = data->createSendContext(); string buff(data->buffer().data(), data->buffer().size()); +// cout << "handle:" << buff << endl; buff += "-" + TC_Common::tostr(TC_Thread::CURRENT_THREADID()); @@ -234,7 +235,7 @@ class MyTcpServer void bindTcpQueue(const std::string &str) { - TC_EpollServer::BindAdapterPtr lsPtr = _epollServer->createBindAdapter("TcpQueueAdapter", str, 5); + TC_EpollServer::BindAdapterPtr lsPtr = _epollServer->createBindAdapter("TcpQueueAdapter", str, 5); lsPtr->enableQueueMode(); diff --git a/unit-test/util/test_tc_epoller_server.cpp b/unit-test/util/test_tc_epoller_server.cpp index 44f33e4a..8de9f4a8 100644 --- a/unit-test/util/test_tc_epoller_server.cpp +++ b/unit-test/util/test_tc_epoller_server.cpp @@ -126,15 +126,18 @@ TEST_F(UtilEpollServerTest, RunEnableManualListen) ASSERT_TRUE(iRet == -6); server.startManualListen(); + TC_Common::msleep(50); iRet = client.sendRecv("abc", 3, recvBuffer, recvLenth); ASSERT_TRUE(iRet == 0); ASSERT_TRUE(string(recvBuffer, recvLenth) == "abc"); server.cancelManualListen(); + TC_Common::msleep(50); iRet = client.sendRecv("abc", 3, recvBuffer, recvLenth); ASSERT_TRUE(iRet != TC_ClientSocket::EM_SUCCESS); server.startManualListen(); + TC_Common::msleep(50); iRet = client.sendRecv("abc", 3, recvBuffer, recvLenth); ASSERT_TRUE(iRet == 0); ASSERT_TRUE(string(recvBuffer, recvLenth) == "abc"); @@ -242,15 +245,18 @@ TEST_F(UtilEpollServerTest, ConnectionMax) TEST_F(UtilEpollServerTest, QueueMode) { for(int i = 0; i <= TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO; i++) +//int i = 2; { +// LOG_CONSOLE_DEBUG << "i:" << i << endl; MyTcpServer server; startServer(server, (TC_EpollServer::SERVER_OPEN_COROUTINE) i); vector conns; + int conn = 10; //注意服务连接数是10个 - for (int i = 0; i < 10; i++) { + for (int i = 0; i < conn; i++) { TC_TCPClient *client = new TC_TCPClient(QUEUE_HOST_EP.getHost(), QUEUE_HOST_EP.getPort(), QUEUE_HOST_EP.getTimeout()); @@ -265,7 +271,9 @@ TEST_F(UtilEpollServerTest, QueueMode) char recvBuffer[1024] = {0}; size_t recvLenth = 1024; - iRet = client->sendRecv("abc", 3, recvBuffer, recvLenth); +// LOG_CONSOLE_DEBUG << "send:" << i << endl; + + iRet = client->sendRecv("abc", 3, recvBuffer, recvLenth); ASSERT_TRUE(iRet == 0); diff --git a/util/include/util/tc_epoll_server.h b/util/include/util/tc_epoll_server.h index aa4601f1..21247544 100755 --- a/util/include/util/tc_epoll_server.h +++ b/util/include/util/tc_epoll_server.h @@ -327,7 +327,7 @@ class TC_EpollServer : public TC_HandleBase, public detail::LogInterface * 构造, 传入handle处理线程, * @param handleNum */ - DataBuffer(int handleNum); + DataBuffer(int handleNum, TC_EpollServer *epollServer); /** * 通知唤醒 @@ -416,6 +416,10 @@ class TC_EpollServer : public TC_HandleBase, public detail::LogInterface protected: + /** + * epoll server + */ + TC_EpollServer *_epollServer; /** * 接收队列数据总个数 */ @@ -441,7 +445,7 @@ class TC_EpollServer : public TC_HandleBase, public detail::LogInterface /** * wait time for queue */ - int64_t _iWaitTime = 3000; + int64_t _iWaitTime = 1000; }; //////////////////////////////////////////////////////////////////////////// @@ -1432,7 +1436,7 @@ class TC_EpollServer : public TC_HandleBase, public detail::LogInterface _iHandleNum = n; - _dataBuffer.reset(new DataBuffer(_iHandleNum)); + _dataBuffer.reset(new DataBuffer(_iHandleNum, this->_epollServer)); for (size_t i = 0; i < _iHandleNum; ++i) { diff --git a/util/src/tc_epoll_server.cpp b/util/src/tc_epoll_server.cpp index d271a9b6..cef25b2c 100644 --- a/util/src/tc_epoll_server.cpp +++ b/util/src/tc_epoll_server.cpp @@ -40,8 +40,10 @@ void TC_EpollServer::RecvContext::parseIpPort() const } } -TC_EpollServer::DataBuffer::DataBuffer(int handleNum) +TC_EpollServer::DataBuffer::DataBuffer(int handleNum, TC_EpollServer *epollServer) { + _epollServer = epollServer; + for(int i = 0; i < handleNum; i++) { _threadDataQueue.push_back(std::make_shared()); @@ -59,6 +61,7 @@ const shared_ptr &TC_EpollServer::DataBuf //如果是队列模式, 则返回handle线程对应的队列 if(isQueueMode()) { +// LOG_CONSOLE_DEBUG << "handleIndex:" << handleIndex << ", data queue index:" << index(handleIndex) << endl; return _threadDataQueue[index(handleIndex)]; } @@ -73,21 +76,38 @@ void TC_EpollServer::DataBuffer::notifyBuffer(uint32_t handleIndex) void TC_EpollServer::DataBuffer::insertRecvQueue(const shared_ptr &recv) { +// LOG_CONSOLE_DEBUG << endl; ++_iRecvBufferSize; - getDataQueue(recv->uid())->push_back(recv); + if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO) + { + //协程模式本身也是队列模式 + getDataQueue(recv->threadIndex())->push_back(recv); + } + else + { + //非协程模式下, getDataQueue会判断是否是队列模式 + getDataQueue(recv->uid())->push_back(recv); + } if(_schedulers[0] != NULL) { - //存在调度器, 处于协程中 - if(isQueueMode()) - { - _schedulers[index(recv->uid())]->notify(); - } - else - { - _schedulers[index(rand())]->notify(); - } + if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO) + { + _schedulers[index(recv->threadIndex())]->notify(); + } + else + { + //存在调度器, 处于协程中 + if (isQueueMode()) + { + _schedulers[index(recv->uid())]->notify(); + } + else + { + _schedulers[index(rand())]->notify(); + } + } } } @@ -100,19 +120,33 @@ void TC_EpollServer::DataBuffer::insertRecvQueue(const dequeuid())->push_back(recv); + + if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO) + { + //协程模式本身也是队列模式 + getDataQueue(recv.back()->threadIndex())->push_back(recv); + } + else + { + //非协程模式下, getDataQueue会判断是否是队列模式 + getDataQueue(recv.back()->uid())->push_back(recv); + } if (_schedulers[0] != NULL) { - //存在调度器, 处于协程中 - if (isQueueMode()) - { - _schedulers[index(recv.back()->uid())]->notify(); - } - else - { - _schedulers[index(rand())]->notify(); - } + if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO) + { + _schedulers[index(recv.back()->threadIndex())]->notify(); + } + else + { + //存在调度器, 处于协程中 + if (isQueueMode()) { + _schedulers[index(recv.back()->uid())]->notify(); + } else { + _schedulers[index(rand())]->notify(); + } + } } } @@ -331,6 +365,7 @@ void TC_EpollServer::Handle::handleOnceThread() while ((loop--) > 0 && _dataBuffer->pop(_handleIndex, data)) { +// LOG_CONSOLE_DEBUG << "handleIndex1:" << _handleIndex << endl; try { //上报心跳 @@ -377,6 +412,7 @@ void TC_EpollServer::Handle::handleOnceThread() } } +// LOG_CONSOLE_DEBUG << "handleIndex2:" << _handleIndex << ", size:" << _dataBuffer->size(_handleIndex) << endl; if (loop <= 0 && _dataBuffer->size(_handleIndex) > 0) { @@ -384,6 +420,8 @@ void TC_EpollServer::Handle::handleOnceThread() //NET_THREAD_QUEUE_HANDLES_THREAD模式下不需要通知,handleLoopThread循环中会自动再次处理 if (_epollServer->getOpenCoroutine() == NET_THREAD_MERGE_HANDLES_THREAD) { +// LOG_CONSOLE_DEBUG << "notifyFilter:" << _handleIndex << endl; + notifyFilter(); } } @@ -610,7 +648,9 @@ bool TC_EpollServer::Connection::handleOutputImp(const shared_ptr &data) { - TC_EpollServer::NetThread *netThread = (TC_EpollServer::NetThread *)data->cookie(); +// LOG_CONSOLE_DEBUG << endl; + + TC_EpollServer::NetThread *netThread = (TC_EpollServer::NetThread *)data->cookie(); try { @@ -1660,7 +1700,7 @@ void TC_EpollServer::NetThread::addTcpConnection(TC_EpollServer::Connection *cPt { uint32_t uid = _list->getUniqId(); - // LOG_CONSOLE_DEBUG << "uid:" << uid << endl; +// LOG_CONSOLE_DEBUG << "uid:" << uid << ", " << this->_threadIndex << endl; cPtr->initialize(_epoller, uid, this); @@ -1931,7 +1971,7 @@ bool TC_EpollServer::accept(int fd, int domain) return true; } - // LOG_CONSOLE_DEBUG << "fd:" << fd << ", cfd:" << cs.getfd() << endl; +// cou<< "fd:" << fd << ", cfd:" << cs.getfd() << endl; cs.setblock(false);