Skip to content

Commit

Permalink
fix epoll server queue mode in co mode
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanshudong committed Oct 12, 2023
1 parent 93339e9 commit 88cf550
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 30 deletions.
3 changes: 2 additions & 1 deletion unit-test/util/test_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class TcpQueueHandle : public TC_EpollServer::Handle
shared_ptr<TC_EpollServer::SendContext> send = data->createSendContext();

string buff(data->buffer().data(), data->buffer().size());
// cout << "handle:" << buff << endl;

buff += "-" + TC_Common::tostr(TC_Thread::CURRENT_THREADID());

Expand Down Expand Up @@ -234,7 +235,7 @@ class MyTcpServer

void bindTcpQueue(const std::string &str)
{
TC_EpollServer::BindAdapterPtr lsPtr = _epollServer->createBindAdapter<TcpQueueHandle>("TcpQueueAdapter", str, 5);
TC_EpollServer::BindAdapterPtr lsPtr = _epollServer->createBindAdapter<TcpQueueHandle>("TcpQueueAdapter", str, 5);

lsPtr->enableQueueMode();

Expand Down
12 changes: 10 additions & 2 deletions unit-test/util/test_tc_epoller_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,18 @@ TEST_F(UtilEpollServerTest, RunEnableManualListen)
ASSERT_TRUE(iRet == -6);

server.startManualListen();
TC_Common::msleep(50);
iRet = client.sendRecv("abc", 3, recvBuffer, recvLenth);
ASSERT_TRUE(iRet == 0);
ASSERT_TRUE(string(recvBuffer, recvLenth) == "abc");

server.cancelManualListen();
TC_Common::msleep(50);
iRet = client.sendRecv("abc", 3, recvBuffer, recvLenth);
ASSERT_TRUE(iRet != TC_ClientSocket::EM_SUCCESS);

server.startManualListen();
TC_Common::msleep(50);
iRet = client.sendRecv("abc", 3, recvBuffer, recvLenth);
ASSERT_TRUE(iRet == 0);
ASSERT_TRUE(string(recvBuffer, recvLenth) == "abc");
Expand Down Expand Up @@ -242,15 +245,18 @@ TEST_F(UtilEpollServerTest, ConnectionMax)
TEST_F(UtilEpollServerTest, QueueMode)
{
for(int i = 0; i <= TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO; i++)
//int i = 2;
{
// LOG_CONSOLE_DEBUG << "i:" << i << endl;
MyTcpServer server;

startServer(server, (TC_EpollServer::SERVER_OPEN_COROUTINE) i);

vector<TC_TCPClient *> conns;

int conn = 10;
//注意服务连接数是10个
for (int i = 0; i < 10; i++) {
for (int i = 0; i < conn; i++) {
TC_TCPClient *client =
new TC_TCPClient(QUEUE_HOST_EP.getHost(), QUEUE_HOST_EP.getPort(), QUEUE_HOST_EP.getTimeout());

Expand All @@ -265,7 +271,9 @@ TEST_F(UtilEpollServerTest, QueueMode)
char recvBuffer[1024] = {0};
size_t recvLenth = 1024;

iRet = client->sendRecv("abc", 3, recvBuffer, recvLenth);
// LOG_CONSOLE_DEBUG << "send:" << i << endl;

iRet = client->sendRecv("abc", 3, recvBuffer, recvLenth);

ASSERT_TRUE(iRet == 0);

Expand Down
10 changes: 7 additions & 3 deletions util/include/util/tc_epoll_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class TC_EpollServer : public TC_HandleBase, public detail::LogInterface
* 构造, 传入handle处理线程,
* @param handleNum
*/
DataBuffer(int handleNum);
DataBuffer(int handleNum, TC_EpollServer *epollServer);

/**
* 通知唤醒
Expand Down Expand Up @@ -416,6 +416,10 @@ class TC_EpollServer : public TC_HandleBase, public detail::LogInterface

protected:

/**
* epoll server
*/
TC_EpollServer *_epollServer;
/**
* 接收队列数据总个数
*/
Expand All @@ -441,7 +445,7 @@ class TC_EpollServer : public TC_HandleBase, public detail::LogInterface
/**
* wait time for queue
*/
int64_t _iWaitTime = 3000;
int64_t _iWaitTime = 1000;
};

////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1432,7 +1436,7 @@ class TC_EpollServer : public TC_HandleBase, public detail::LogInterface

_iHandleNum = n;

_dataBuffer.reset(new DataBuffer(_iHandleNum));
_dataBuffer.reset(new DataBuffer(_iHandleNum, this->_epollServer));

for (size_t i = 0; i < _iHandleNum; ++i)
{
Expand Down
88 changes: 64 additions & 24 deletions util/src/tc_epoll_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ void TC_EpollServer::RecvContext::parseIpPort() const
}
}

TC_EpollServer::DataBuffer::DataBuffer(int handleNum)
TC_EpollServer::DataBuffer::DataBuffer(int handleNum, TC_EpollServer *epollServer)
{
_epollServer = epollServer;

for(int i = 0; i < handleNum; i++)
{
_threadDataQueue.push_back(std::make_shared<DataQueue>());
Expand All @@ -59,6 +61,7 @@ const shared_ptr<TC_EpollServer::DataBuffer::DataQueue> &TC_EpollServer::DataBuf
//如果是队列模式, 则返回handle线程对应的队列
if(isQueueMode())
{
// LOG_CONSOLE_DEBUG << "handleIndex:" << handleIndex << ", data queue index:" << index(handleIndex) << endl;
return _threadDataQueue[index(handleIndex)];
}

Expand All @@ -73,21 +76,38 @@ void TC_EpollServer::DataBuffer::notifyBuffer(uint32_t handleIndex)

void TC_EpollServer::DataBuffer::insertRecvQueue(const shared_ptr<RecvContext> &recv)
{
// LOG_CONSOLE_DEBUG << endl;
++_iRecvBufferSize;

getDataQueue(recv->uid())->push_back(recv);
if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO)
{
//协程模式本身也是队列模式
getDataQueue(recv->threadIndex())->push_back(recv);
}
else
{
//非协程模式下, getDataQueue会判断是否是队列模式
getDataQueue(recv->uid())->push_back(recv);
}

if(_schedulers[0] != NULL)
{
//存在调度器, 处于协程中
if(isQueueMode())
{
_schedulers[index(recv->uid())]->notify();
}
else
{
_schedulers[index(rand())]->notify();
}
if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO)
{
_schedulers[index(recv->threadIndex())]->notify();
}
else
{
//存在调度器, 处于协程中
if (isQueueMode())
{
_schedulers[index(recv->uid())]->notify();
}
else
{
_schedulers[index(rand())]->notify();
}
}
}
}

Expand All @@ -100,19 +120,33 @@ void TC_EpollServer::DataBuffer::insertRecvQueue(const deque<shared_ptr<RecvCont

_iRecvBufferSize += recv.size();

getDataQueue(recv.back()->uid())->push_back(recv);

if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO)
{
//协程模式本身也是队列模式
getDataQueue(recv.back()->threadIndex())->push_back(recv);
}
else
{
//非协程模式下, getDataQueue会判断是否是队列模式
getDataQueue(recv.back()->uid())->push_back(recv);
}

if (_schedulers[0] != NULL)
{
//存在调度器, 处于协程中
if (isQueueMode())
{
_schedulers[index(recv.back()->uid())]->notify();
}
else
{
_schedulers[index(rand())]->notify();
}
if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO)
{
_schedulers[index(recv.back()->threadIndex())]->notify();
}
else
{
//存在调度器, 处于协程中
if (isQueueMode()) {
_schedulers[index(recv.back()->uid())]->notify();
} else {
_schedulers[index(rand())]->notify();
}
}
}
}

Expand Down Expand Up @@ -331,6 +365,7 @@ void TC_EpollServer::Handle::handleOnceThread()

while ((loop--) > 0 && _dataBuffer->pop(_handleIndex, data))
{
// LOG_CONSOLE_DEBUG << "handleIndex1:" << _handleIndex << endl;
try
{
//上报心跳
Expand Down Expand Up @@ -377,13 +412,16 @@ void TC_EpollServer::Handle::handleOnceThread()
}
}

// LOG_CONSOLE_DEBUG << "handleIndex2:" << _handleIndex << ", size:" << _dataBuffer->size(_handleIndex) << endl;

if (loop <= 0 && _dataBuffer->size(_handleIndex) > 0)
{
//NET_THREAD_MERGE_HANDLES_THREAD模式下,_dataBuffer中还有数据,需要通知再次处理
//NET_THREAD_QUEUE_HANDLES_THREAD模式下不需要通知,handleLoopThread循环中会自动再次处理
if (_epollServer->getOpenCoroutine() == NET_THREAD_MERGE_HANDLES_THREAD)
{
// LOG_CONSOLE_DEBUG << "notifyFilter:" << _handleIndex << endl;

notifyFilter();
}
}
Expand Down Expand Up @@ -610,7 +648,9 @@ bool TC_EpollServer::Connection::handleOutputImp(const shared_ptr<TC_Epoller::Ep

bool TC_EpollServer::Connection::handleInputImp(const shared_ptr<TC_Epoller::EpollInfo> &data)
{
TC_EpollServer::NetThread *netThread = (TC_EpollServer::NetThread *)data->cookie();
// LOG_CONSOLE_DEBUG << endl;

TC_EpollServer::NetThread *netThread = (TC_EpollServer::NetThread *)data->cookie();

try
{
Expand Down Expand Up @@ -1660,7 +1700,7 @@ void TC_EpollServer::NetThread::addTcpConnection(TC_EpollServer::Connection *cPt
{
uint32_t uid = _list->getUniqId();

// LOG_CONSOLE_DEBUG << "uid:" << uid << endl;
// LOG_CONSOLE_DEBUG << "uid:" << uid << ", " << this->_threadIndex << endl;

cPtr->initialize(_epoller, uid, this);

Expand Down Expand Up @@ -1931,7 +1971,7 @@ bool TC_EpollServer::accept(int fd, int domain)
return true;
}

// LOG_CONSOLE_DEBUG << "fd:" << fd << ", cfd:" << cs.getfd() << endl;
// cou<< "fd:" << fd << ", cfd:" << cs.getfd() << endl;

cs.setblock(false);

Expand Down

0 comments on commit 88cf550

Please sign in to comment.