From 4167fc11f901564da855c00954ab42019f3aa420 Mon Sep 17 00:00:00 2001 From: ruanshudong Date: Thu, 12 Oct 2023 20:37:00 +0800 Subject: [PATCH] fix epoll server fd leak in co mode --- util/src/tc_coroutine.cpp | 11 +- util/src/tc_epoll_server.cpp | 3186 +++++++++++++++++----------------- util/src/tc_epoller.cpp | 1 + 3 files changed, 1610 insertions(+), 1588 deletions(-) diff --git a/util/src/tc_coroutine.cpp b/util/src/tc_coroutine.cpp index 90b67538..3da86e16 100755 --- a/util/src/tc_coroutine.cpp +++ b/util/src/tc_coroutine.cpp @@ -326,8 +326,6 @@ TC_CoroutineScheduler::TC_CoroutineScheduler() , _currentCoro(NULL) , _all_coro(NULL) { - // LOG_CONSOLE_DEBUG << endl; - _epoller = new TC_Epoller(); _epoller->create(10240); @@ -335,7 +333,6 @@ TC_CoroutineScheduler::TC_CoroutineScheduler() TC_CoroutineScheduler::~TC_CoroutineScheduler() { - // LOG_CONSOLE_DEBUG << endl; if(_epoller) { delete _epoller; @@ -666,6 +663,12 @@ void TC_CoroutineScheduler::terminate() assert(_epoller); _epoller->terminate(); +// +// if(_epoller) +// { +// delete _epoller; +// _epoller = NULL; +// } } uint32_t TC_CoroutineScheduler::generateId() @@ -816,6 +819,8 @@ void TC_Coroutine::run() handleCoro(); destroy(); + +// TC_CoroutineScheduler::reset(); } void TC_Coroutine::terminate() diff --git a/util/src/tc_epoll_server.cpp b/util/src/tc_epoll_server.cpp index a0f5c9dc..75ac44c4 100644 --- a/util/src/tc_epoll_server.cpp +++ b/util/src/tc_epoll_server.cpp @@ -34,50 +34,50 @@ namespace tars void TC_EpollServer::RecvContext::parseIpPort() const { - if (_ip.empty()) - { - TC_Socket::parseAddr(_addr, _ip, _port); - } + if (_ip.empty()) + { + TC_Socket::parseAddr(_addr, _ip, _port); + } } TC_EpollServer::DataBuffer::DataBuffer(int handleNum, TC_EpollServer *epollServer) { _epollServer = epollServer; - for(int i = 0; i < handleNum; i++) - { - _threadDataQueue.push_back(std::make_shared()); - } + for(int i = 0; i < handleNum; i++) + { + _threadDataQueue.push_back(std::make_shared()); + } - _schedulers.resize(handleNum); - for(size_t i = 0; i < _schedulers.size(); i++) - { - _schedulers[i] = NULL; - } + _schedulers.resize(handleNum); + for(size_t i = 0; i < _schedulers.size(); i++) + { + _schedulers[i] = NULL; + } } const shared_ptr &TC_EpollServer::DataBuffer::getDataQueue(size_t handleIndex) { - //如果是队列模式, 则返回handle线程对应的队列 - if(isQueueMode()) - { + //如果是队列模式, 则返回handle线程对应的队列 + if(isQueueMode()) + { // LOG_CONSOLE_DEBUG << "handleIndex:" << handleIndex << ", data queue index:" << index(handleIndex) << endl; - return _threadDataQueue[index(handleIndex)]; - } + return _threadDataQueue[index(handleIndex)]; + } - //否则返回第一个队列(所有人共享) - return _threadDataQueue[0]; + //否则返回第一个队列(所有人共享) + return _threadDataQueue[0]; } void TC_EpollServer::DataBuffer::notifyBuffer(uint32_t handleIndex) { - getDataQueue(handleIndex)->notify(); + getDataQueue(handleIndex)->notify(); } void TC_EpollServer::DataBuffer::insertRecvQueue(const shared_ptr &recv) { // LOG_CONSOLE_DEBUG << endl; - ++_iRecvBufferSize; + ++_iRecvBufferSize; if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO) { @@ -90,8 +90,8 @@ void TC_EpollServer::DataBuffer::insertRecvQueue(const shared_ptr & getDataQueue(recv->uid())->push_back(recv); } - if(_schedulers[0] != NULL) - { + if(_schedulers[0] != NULL) + { if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO) { _schedulers[index(recv->threadIndex())]->notify(); @@ -108,17 +108,17 @@ void TC_EpollServer::DataBuffer::insertRecvQueue(const shared_ptr & _schedulers[index(rand())]->notify(); } } - } + } } void TC_EpollServer::DataBuffer::insertRecvQueue(const deque> &recv) { - if (recv.empty()) - { - return; - } + if (recv.empty()) + { + return; + } - _iRecvBufferSize += recv.size(); + _iRecvBufferSize += recv.size(); if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO) { @@ -131,8 +131,8 @@ void TC_EpollServer::DataBuffer::insertRecvQueue(const dequeuid())->push_back(recv); } - if (_schedulers[0] != NULL) - { + if (_schedulers[0] != NULL) + { if(_epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO) { _schedulers[index(recv.back()->threadIndex())]->notify(); @@ -146,56 +146,56 @@ void TC_EpollServer::DataBuffer::insertRecvQueue(const dequenotify(); } } - } + } } bool TC_EpollServer::DataBuffer::wait(uint32_t handleIndex) { - return getDataQueue(handleIndex)->wait(_iWaitTime); + return getDataQueue(handleIndex)->wait(_iWaitTime); } bool TC_EpollServer::DataBuffer::pop(uint32_t handleIndex, shared_ptr &data) { - bool bRet = getDataQueue(handleIndex)->pop_front(data); + bool bRet = getDataQueue(handleIndex)->pop_front(data); - if (!bRet) - { - return bRet; - } + if (!bRet) + { + return bRet; + } - --_iRecvBufferSize; + --_iRecvBufferSize; - return bRet; + return bRet; } void TC_EpollServer::DataBuffer::registerScheduler(uint32_t handleIndex, const shared_ptr &scheduler) { - assert(handleIndex < _schedulers.size()); + assert(handleIndex < _schedulers.size()); - // LOG_CONSOLE_DEBUG << handleIndex << ", " << _schedulers.size() << endl; + // LOG_CONSOLE_DEBUG << handleIndex << ", " << _schedulers.size() << endl; - _schedulers[handleIndex] = scheduler; + _schedulers[handleIndex] = scheduler; } void TC_EpollServer::DataBuffer::unregisterScheduler(uint32_t handleIndex) { - // LOG_CONSOLE_DEBUG << handleIndex << ", " << _schedulers.size() << endl; - assert(handleIndex < _schedulers.size()); + // LOG_CONSOLE_DEBUG << handleIndex << ", " << _schedulers.size() << endl; + assert(handleIndex < _schedulers.size()); - _schedulers[handleIndex] = NULL; + _schedulers[handleIndex] = NULL; } const shared_ptr &TC_EpollServer::DataBuffer::getScheduler(uint32_t handleIndex) { - assert(handleIndex < _schedulers.size()); + assert(handleIndex < _schedulers.size()); - return _schedulers[handleIndex]; + return _schedulers[handleIndex]; } ////////////////////////////////////////////////////////////////////////////////////////////////////////////// // handle的实现 TC_EpollServer::Handle::Handle() - : _epollServer(NULL) + : _epollServer(NULL) { } @@ -205,7 +205,7 @@ TC_EpollServer::Handle::~Handle() void TC_EpollServer::Handle::setWaitTime(uint32_t iWaitTime) { - this->getBindAdapter()->getDataBuffer()->setWaitTime(iWaitTime); + this->getBindAdapter()->getDataBuffer()->setWaitTime(iWaitTime); } void TC_EpollServer::Handle::handleClose(const shared_ptr & data) @@ -214,216 +214,216 @@ void TC_EpollServer::Handle::handleClose(const shared_ptr & data) void TC_EpollServer::Handle::handleTimeout(const shared_ptr & data) { - _epollServer->error("[Handle::handleTimeout] queue timeout, close [" + data->ip() + ":" + TC_Common::tostr(data->port()) + "]."); + _epollServer->error("[Handle::handleTimeout] queue timeout, close [" + data->ip() + ":" + TC_Common::tostr(data->port()) + "]."); - close(data); + close(data); } void TC_EpollServer::Handle::handleOverload(const shared_ptr & data) { - auto adapter = data->adapter(); + auto adapter = data->adapter(); - if(adapter) - { - _epollServer->error("[Handle::handleOverload] adapter '" + adapter->getName() + "',over load:" - + TC_Common::tostr(adapter->getRecvBufferSize()) + ">" - + TC_Common::tostr(adapter->getQueueCapacity()) + "."); - } + if(adapter) + { + _epollServer->error("[Handle::handleOverload] adapter '" + adapter->getName() + "',over load:" + + TC_Common::tostr(adapter->getRecvBufferSize()) + ">" + + TC_Common::tostr(adapter->getQueueCapacity()) + "."); + } - close(data); + close(data); } void TC_EpollServer::Handle::terminate() { // _terminate = true; - notifyFilter(); + notifyFilter(); } void TC_EpollServer::Handle::handleOnceCoroutine() { - const shared_ptr &scheduler = TC_CoroutineScheduler::scheduler(); - - assert(scheduler); - - //上报心跳 - heartbeat(); - - //为了实现所有主逻辑的单线程化,在每次循环中给业务处理自有消息的机会 - handleAsyncResponse(); - handleCustomMessage(true); - - bool bYield = false; - - shared_ptr data; - try - { - int loop = 1000; - - while ((loop--) > 0 && !_epollServer->isTerminate()) - { - if ((scheduler->getFreeSize() > 0) && _dataBuffer->pop(_handleIndex, data)) - { - bYield = true; - - //上报心跳 - heartbeat(); - - //为了实现所有主逻辑的单线程化,在每次循环中给业务处理自有消息的机会 - handleAsyncResponse(); - - if (data->isOverload()) - { - //数据已超载 overload - handleOverload(data); - } - else if (data->isClosed()) - { - //关闭连接的通知消息 - handleClose(data); - } - else if ((TNOWMS - data->recvTimeStamp()) > (uint64_t)_bindAdapter->getQueueTimeout()) - { - //数据在队列中已经超时了 - handleTimeout(data); - } - else - { - uint32_t iRet = scheduler->go(std::bind(&Handle::handle, this, data)); - if (iRet == 0) - { + const shared_ptr &scheduler = TC_CoroutineScheduler::scheduler(); + + assert(scheduler); + + //上报心跳 + heartbeat(); + + //为了实现所有主逻辑的单线程化,在每次循环中给业务处理自有消息的机会 + handleAsyncResponse(); + handleCustomMessage(true); + + bool bYield = false; + + shared_ptr data; + try + { + int loop = 1000; + + while ((loop--) > 0 && !_epollServer->isTerminate()) + { + if ((scheduler->getFreeSize() > 0) && _dataBuffer->pop(_handleIndex, data)) + { + bYield = true; + + //上报心跳 + heartbeat(); + + //为了实现所有主逻辑的单线程化,在每次循环中给业务处理自有消息的机会 + handleAsyncResponse(); + + if (data->isOverload()) + { + //数据已超载 overload + handleOverload(data); + } + else if (data->isClosed()) + { + //关闭连接的通知消息 + handleClose(data); + } + else if ((TNOWMS - data->recvTimeStamp()) > (uint64_t)_bindAdapter->getQueueTimeout()) + { + //数据在队列中已经超时了 + handleTimeout(data); + } + else + { + uint32_t iRet = scheduler->go(std::bind(&Handle::handle, this, data)); + if (iRet == 0) + { // LOG_CONSOLE_DEBUG << "handleOverload" << endl; - handleOverload(data); - } - } - handleCustomMessage(false); - } - else - { - bYield = false; - break; - } - } - - //循环100次, 没有任何消息需要处理, yield一下 - if (loop == 0) { - bYield = false; - } - } - catch (exception& ex) - { - if (data) - { - close(data); - } - - getEpollServer()->error("[TC_EpollServer::Handle::handleOnceCoroutine] error:" + string(ex.what())); - } - catch (...) - { - if (data) - { - close(data); - } - - getEpollServer()->error("[TC_EpollServer::Handle::handleOnceCoroutine] unknown error"); - } - - if (!bYield) - { - scheduler->yield(); - } - else if(scheduler->isMainCoroutine()) - { - scheduler->notify(); - } + handleOverload(data); + } + } + handleCustomMessage(false); + } + else + { + bYield = false; + break; + } + } + + //循环100次, 没有任何消息需要处理, yield一下 + if (loop == 0) { + bYield = false; + } + } + catch (exception& ex) + { + if (data) + { + close(data); + } + + getEpollServer()->error("[TC_EpollServer::Handle::handleOnceCoroutine] error:" + string(ex.what())); + } + catch (...) + { + if (data) + { + close(data); + } + + getEpollServer()->error("[TC_EpollServer::Handle::handleOnceCoroutine] unknown error"); + } + + if (!bYield) + { + scheduler->yield(); + } + else if(scheduler->isMainCoroutine()) + { + scheduler->notify(); + } // LOG_CONSOLE("handleOnceCoroutine ok"); } void TC_EpollServer::Handle::handleCoroutine() { - for(;;) - { - handleOnceCoroutine(); - } + for(;;) + { + handleOnceCoroutine(); + } } void TC_EpollServer::Handle::handleOnceThread() { - //上报心跳 - heartbeat(); + //上报心跳 + heartbeat(); - //为了实现所有主逻辑的单线程化,在每次循环中给业务处理自有消息的机会 - handleAsyncResponse(); - handleCustomMessage(true); + //为了实现所有主逻辑的单线程化,在每次循环中给业务处理自有消息的机会 + handleAsyncResponse(); + handleCustomMessage(true); - shared_ptr data; + shared_ptr data; - int loop = 100; + int loop = 100; - while ((loop--) > 0 && _dataBuffer->pop(_handleIndex, data)) - { + while ((loop--) > 0 && _dataBuffer->pop(_handleIndex, data)) + { // LOG_CONSOLE_DEBUG << "handleIndex1:" << _handleIndex << endl; - try - { - //上报心跳 - heartbeat(); - - //为了实现所有主逻辑的单线程化,在每次循环中给业务处理自有消息的机会 - handleAsyncResponse(); - - if (data->isOverload()) - { - //数据已超载 overload - handleOverload(data); - } else if (data->isClosed()) - { - //关闭连接的通知消息 - handleClose(data); - } else if ((TNOWMS - data->recvTimeStamp()) > (uint64_t) _bindAdapter->getQueueTimeout()) - { - //数据在队列中已经超时了 - handleTimeout(data); - } else - { - handle(data); - } - handleCustomMessage(false); - } - catch (exception &ex) - { - if (data) - { - close(data); - } - - getEpollServer()->error("[Handle::handleImp] error:" + string(ex.what())); - } - catch (...) - { - if (data) - { - close(data); - } - - getEpollServer()->error("[Handle::handleImp] unknown error"); - } - } + try + { + //上报心跳 + heartbeat(); + + //为了实现所有主逻辑的单线程化,在每次循环中给业务处理自有消息的机会 + handleAsyncResponse(); + + if (data->isOverload()) + { + //数据已超载 overload + handleOverload(data); + } else if (data->isClosed()) + { + //关闭连接的通知消息 + handleClose(data); + } else if ((TNOWMS - data->recvTimeStamp()) > (uint64_t) _bindAdapter->getQueueTimeout()) + { + //数据在队列中已经超时了 + handleTimeout(data); + } else + { + handle(data); + } + handleCustomMessage(false); + } + catch (exception &ex) + { + if (data) + { + close(data); + } + + getEpollServer()->error("[Handle::handleImp] error:" + string(ex.what())); + } + catch (...) + { + if (data) + { + close(data); + } + + getEpollServer()->error("[Handle::handleImp] unknown error"); + } + } // LOG_CONSOLE_DEBUG << "handleIndex2:" << _handleIndex << ", size:" << _dataBuffer->size(_handleIndex) << endl; - if (loop <= 0 && _dataBuffer->size(_handleIndex) > 0) - { - //NET_THREAD_MERGE_HANDLES_THREAD模式下,_dataBuffer中还有数据,需要通知再次处理 - //NET_THREAD_QUEUE_HANDLES_THREAD模式下不需要通知,handleLoopThread循环中会自动再次处理 - if (_epollServer->getOpenCoroutine() == NET_THREAD_MERGE_HANDLES_THREAD) - { + if (loop <= 0 && _dataBuffer->size(_handleIndex) > 0) + { + //NET_THREAD_MERGE_HANDLES_THREAD模式下,_dataBuffer中还有数据,需要通知再次处理 + //NET_THREAD_QUEUE_HANDLES_THREAD模式下不需要通知,handleLoopThread循环中会自动再次处理 + if (_epollServer->getOpenCoroutine() == NET_THREAD_MERGE_HANDLES_THREAD) + { // LOG_CONSOLE_DEBUG << "notifyFilter:" << _handleIndex << endl; - notifyFilter(); - } - } + notifyFilter(); + } + } // if (loop <= 0) // { @@ -437,212 +437,213 @@ void TC_EpollServer::Handle::handleOnceThread() bool TC_EpollServer::Handle::isReady() const { - if(this->_epollServer->getOpenCoroutine() == NET_THREAD_QUEUE_HANDLES_THREAD) - { - return true; - } - else if(this->_epollServer->getOpenCoroutine() == NET_THREAD_QUEUE_HANDLES_CO) - { - return _scheduler && _scheduler->isReady(); - } + if(this->_epollServer->getOpenCoroutine() == NET_THREAD_QUEUE_HANDLES_THREAD) + { + return true; + } + else if(this->_epollServer->getOpenCoroutine() == NET_THREAD_QUEUE_HANDLES_CO) + { + return _scheduler && _scheduler->isReady(); + } - assert(false); + assert(false); - return false; + return false; } void TC_EpollServer::Handle::handleLoopCoroutine() { - //这种模式下, 为了保证当前线程能做和通信器结合, 必须也等在epoll上 - //因此当网络层收到数据, 写对队列后, 需要唤醒某一个handle的epoll, 从而唤醒某个协程 - _scheduler = TC_CoroutineScheduler::create(); - _scheduler->setPoolStackSize(this->_epollServer->getCoroutinePoolSize(), this->_epollServer->getCoroutineStackSize()); - _scheduler->getEpoller()->setName("epoller-handle"); + //这种模式下, 为了保证当前线程能做和通信器结合, 必须也等在epoll上 + //因此当网络层收到数据, 写对队列后, 需要唤醒某一个handle的epoll, 从而唤醒某个协程 + _scheduler = TC_CoroutineScheduler::create(); + _scheduler->setPoolStackSize(this->_epollServer->getCoroutinePoolSize(), this->_epollServer->getCoroutineStackSize()); + _scheduler->getEpoller()->setName("epoller-handle"); - _dataBuffer->registerScheduler(_handleIndex, _scheduler); + _dataBuffer->registerScheduler(_handleIndex, _scheduler); - initialize(); + initialize(); - _scheduler->go(std::bind(&Handle::handleCoroutine, this)); + _scheduler->go(std::bind(&Handle::handleCoroutine, this)); - _epollServer->notifyThreadReady(); + _epollServer->notifyThreadReady(); - _scheduler->run(); + _scheduler->run(); - _dataBuffer->unregisterScheduler(_handleIndex); + _dataBuffer->unregisterScheduler(_handleIndex); + TC_CoroutineScheduler::reset(); } void TC_EpollServer::Handle::handleLoopThread() { - initialize(); + initialize(); - _epollServer->notifyThreadReady(); + _epollServer->notifyThreadReady(); - while (!_epollServer->isTerminate()) - { - _dataBuffer->wait(_handleIndex); + while (!_epollServer->isTerminate()) + { + _dataBuffer->wait(_handleIndex); - handleOnceThread(); - } + handleOnceThread(); + } } bool TC_EpollServer::Handle::allAdapterIsEmpty() { - if (_dataBuffer->getRecvBufferSize() > 0) { - return false; - } + if (_dataBuffer->getRecvBufferSize() > 0) { + return false; + } - return true; + return true; } bool TC_EpollServer::Handle::allFilterIsEmpty() { - return true; + return true; } void TC_EpollServer::Handle::notifyFilter() { - shared_ptr scheduler = _dataBuffer->getScheduler(this->_handleIndex); - - if (scheduler) - { - if(this->_epollServer->isTerminate()) - { - scheduler->terminate(); - } - else - { - scheduler->notify(); - } - } - else if(_netThread) - { - _netThread->notify(); - } - else - { - _dataBuffer->notifyBuffer(_handleIndex); - } + shared_ptr scheduler = _dataBuffer->getScheduler(this->_handleIndex); + + if (scheduler) + { + if(this->_epollServer->isTerminate()) + { + scheduler->terminate(); + } + else + { + scheduler->notify(); + } + } + else if(_netThread) + { + _netThread->notify(); + } + else + { + _dataBuffer->notifyBuffer(_handleIndex); + } } /////////////////////////////////////////////////////////////////////////////////////////////////////////////// // 服务连接 TC_EpollServer::Connection::Connection(const shared_ptr &connList, BindAdapter *pBindAdapter, int fd, const string& ip, uint16_t port, detail::LogInterface* logger) - : _connList(connList) - , _logger(logger) - , _pBindAdapter(pBindAdapter) - , _uid(0) - , _fd(fd) - , _ip(ip) - , _port(port) - , _iHeaderLen(0) - , _bClose(false) - , _bEmptyConn(true) + : _connList(connList) + , _logger(logger) + , _pBindAdapter(pBindAdapter) + , _uid(0) + , _fd(fd) + , _ip(ip) + , _port(port) + , _iHeaderLen(0) + , _bClose(false) + , _bEmptyConn(true) { - assert(fd != -1); + assert(fd != -1); - _iLastRefreshTime = TNOW; + _iLastRefreshTime = TNOW; } TC_EpollServer::Connection::Connection(const shared_ptr &connList, BindAdapter *pBindAdapter, int fd, detail::LogInterface *logger) - : _connList(connList) - , _logger(logger) - , _pBindAdapter(pBindAdapter) - , _uid(0) - , _fd(fd) - , _port(0) - , _iHeaderLen(0) - , _bClose(false) - , _bEmptyConn(false) /*udp is always false*/ + : _connList(connList) + , _logger(logger) + , _pBindAdapter(pBindAdapter) + , _uid(0) + , _fd(fd) + , _port(0) + , _iHeaderLen(0) + , _bClose(false) + , _bEmptyConn(false) /*udp is always false*/ { - // LOG_CONSOLE_DEBUG << endl; - _iLastRefreshTime = TNOW; + // LOG_CONSOLE_DEBUG << endl; + _iLastRefreshTime = TNOW; } TC_EpollServer::Connection::~Connection() { - // LOG_CONSOLE_DEBUG << endl; + // LOG_CONSOLE_DEBUG << endl; } void TC_EpollServer::Connection::initialize(TC_Epoller *epoller, unsigned int uid, NetThread *netThread) { - _uid = uid; + _uid = uid; - _netThread = netThread; + _netThread = netThread; - const TC_Endpoint &ep = _pBindAdapter->getEndpoint(); + const TC_Endpoint &ep = _pBindAdapter->getEndpoint(); #if TARS_SSL - if (ep.isSSL()) - { - _trans.reset(new TC_SSLTransceiver(epoller, ep)); - } - else if (ep.isTcp()) + if (ep.isSSL()) +{ + _trans.reset(new TC_SSLTransceiver(epoller, ep)); +} +else if (ep.isTcp()) +{ + _trans.reset(new TC_TCPTransceiver(epoller, ep)); +} +else +{ + _trans.reset(new TC_UDPTransceiver(epoller, ep)); +} +#else + if (ep.isUdp()) { - _trans.reset(new TC_TCPTransceiver(epoller, ep)); + _trans.reset(new TC_UDPTransceiver(epoller, ep)); } else { - _trans.reset(new TC_UDPTransceiver(epoller, ep)); + _trans.reset(new TC_TCPTransceiver(epoller, ep)); } -#else - if (ep.isUdp()) - { - _trans.reset(new TC_UDPTransceiver(epoller, ep)); - } - else - { - _trans.reset(new TC_TCPTransceiver(epoller, ep)); - } #endif - _trans->initializeServer(std::bind(&Connection::onCloseCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), - std::bind(&Connection::onRequestCallback, this, std::placeholders::_1), - std::bind(&Connection::onParserCallback, this, std::placeholders::_1, std::placeholders::_2), - std::bind(&Connection::onOpensslCallback, this, std::placeholders::_1), - TC_Transceiver::oncompletepackage_callback(), - std::bind(&Connection::onCompleteNetworkCallback, this, std::placeholders::_1)); + _trans->initializeServer(std::bind(&Connection::onCloseCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), + std::bind(&Connection::onRequestCallback, this, std::placeholders::_1), + std::bind(&Connection::onParserCallback, this, std::placeholders::_1, std::placeholders::_2), + std::bind(&Connection::onOpensslCallback, this, std::placeholders::_1), + TC_Transceiver::oncompletepackage_callback(), + std::bind(&Connection::onCompleteNetworkCallback, this, std::placeholders::_1)); - _trans->setServerAuthCallback(_pBindAdapter->_onVerifyCallback); + _trans->setServerAuthCallback(_pBindAdapter->_onVerifyCallback); - _trans->getRecvBuffer().setConnection(this); + _trans->getRecvBuffer().setConnection(this); } bool TC_EpollServer::Connection::handleOutputImp(const shared_ptr &data) { // LOG_CONSOLE_DEBUG << endl; - TC_EpollServer::NetThread *netThread = (TC_EpollServer::NetThread *)data->cookie(); + TC_EpollServer::NetThread *netThread = (TC_EpollServer::NetThread *)data->cookie(); - int ret = 0; - try - { - ret = sendBuffer(); - } - catch(const std::exception& ex) - { - // LOG_CONSOLE_DEBUG << ex.what() << endl; - ret = -1; - _logger->error(ex.what()); - } + int ret = 0; + try + { + ret = sendBuffer(); + } + catch(const std::exception& ex) + { + // LOG_CONSOLE_DEBUG << ex.what() << endl; + ret = -1; + _logger->error(ex.what()); + } - if (ret < 0) - { - netThread->delConnection(this, true, (ret == -1) ? EM_CLIENT_CLOSE : EM_SERVER_CLOSE); + if (ret < 0) + { + netThread->delConnection(this, true, (ret == -1) ? EM_CLIENT_CLOSE : EM_SERVER_CLOSE); - return false; - } + return false; + } - auto cl = _connList.lock(); - if(cl) - { - cl->refresh(getId(), getTimeout() + TNOW); - } + auto cl = _connList.lock(); + if(cl) + { + cl->refresh(getId(), getTimeout() + TNOW); + } - return true; + return true; } bool TC_EpollServer::Connection::handleInputImp(const shared_ptr &data) @@ -651,216 +652,216 @@ bool TC_EpollServer::Connection::handleInputImp(const shared_ptrcookie(); - try - { - bool bRet = _trans->doResponse(); - if(false == bRet) - { - netThread->delConnection(this, true, EM_CLIENT_CLOSE); - return false; - } - } - catch(const std::exception& ex) - { - // LOG_CONSOLE_DEBUG << ex.what() << endl; - _logger->error(ex.what()); - netThread->delConnection(this, true, EM_CLIENT_CLOSE); - return false; - } - - auto cl = _connList.lock(); - if(cl) - { - cl->refresh(getId(), getTimeout() + TNOW); - } - - return true; + try + { + bool bRet = _trans->doResponse(); + if(false == bRet) + { + netThread->delConnection(this, true, EM_CLIENT_CLOSE); + return false; + } + } + catch(const std::exception& ex) + { + // LOG_CONSOLE_DEBUG << ex.what() << endl; + _logger->error(ex.what()); + netThread->delConnection(this, true, EM_CLIENT_CLOSE); + return false; + } + + auto cl = _connList.lock(); + if(cl) + { + cl->refresh(getId(), getTimeout() + TNOW); + } + + return true; } bool TC_EpollServer::Connection::handleCloseImp(const shared_ptr &data) { - // LOG_CONSOLE_DEBUG << endl; - TC_EpollServer::NetThread *netThread = (TC_EpollServer::NetThread *)data->cookie(); + // LOG_CONSOLE_DEBUG << endl; + TC_EpollServer::NetThread *netThread = (TC_EpollServer::NetThread *)data->cookie(); - netThread->delConnection(this, true, EM_SERVER_CLOSE); - return false; + netThread->delConnection(this, true, EM_SERVER_CLOSE); + return false; } void TC_EpollServer::Connection::registerEvent(TC_EpollServer::NetThread *netThread) { - shared_ptr epollInfo = _trans->bindFd(_fd); + shared_ptr epollInfo = _trans->bindFd(_fd); - epollInfo->cookie(netThread); + epollInfo->cookie(netThread); - map callbacks; + map callbacks; - callbacks[EPOLLIN] = std::bind(&TC_EpollServer::Connection::handleInputImp, this, std::placeholders::_1); - callbacks[EPOLLOUT] = std::bind(&TC_EpollServer::Connection::handleOutputImp, this, std::placeholders::_1); - callbacks[EPOLLERR] = std::bind(&TC_EpollServer::Connection::handleCloseImp, this, std::placeholders::_1); + callbacks[EPOLLIN] = std::bind(&TC_EpollServer::Connection::handleInputImp, this, std::placeholders::_1); + callbacks[EPOLLOUT] = std::bind(&TC_EpollServer::Connection::handleOutputImp, this, std::placeholders::_1); + callbacks[EPOLLERR] = std::bind(&TC_EpollServer::Connection::handleCloseImp, this, std::placeholders::_1); - // 回调 - if (this->isTcp() && this->_pBindAdapter->_epollServer->_acceptFunc != NULL) - { - try - { - this->_pBindAdapter->_epollServer->_acceptFunc(this); - } - catch(exception &ex) - { - _logger->error(string("accept callback error:") + ex.what()); - } - } + // 回调 + if (this->isTcp() && this->_pBindAdapter->_epollServer->_acceptFunc != NULL) + { + try + { + this->_pBindAdapter->_epollServer->_acceptFunc(this); + } + catch(exception &ex) + { + _logger->error(string("accept callback error:") + ex.what()); + } + } - epollInfo->registerCallback(callbacks, EPOLLIN | EPOLLOUT); + epollInfo->registerCallback(callbacks, EPOLLIN | EPOLLOUT); - //注意registerCallback, 网络线程马上关注网络事件, 并执行回调, 有可能由于_trans被释放了! + //注意registerCallback, 网络线程马上关注网络事件, 并执行回调, 有可能由于_trans被释放了! } void TC_EpollServer::Connection::close() { - _trans->close(); - // LOG_CONSOLE_DEBUG << _trans.use_count() << endl; + _trans->close(); + // LOG_CONSOLE_DEBUG << _trans.use_count() << endl; } void TC_EpollServer::Connection::onCloseCallback(TC_Transceiver *trans, TC_Transceiver::CloseReason reason, const string &err) { - _pBindAdapter->decreaseSendBufferSize(_messages.size()); + _pBindAdapter->decreaseSendBufferSize(_messages.size()); - if (trans->getEndpoint().isTcp() && trans->isValid()) - { - _pBindAdapter->decreaseSendBufferSize(); - } + if (trans->getEndpoint().isTcp() && trans->isValid()) + { + _pBindAdapter->decreaseSendBufferSize(); + } } std::shared_ptr TC_EpollServer::Connection::onOpensslCallback(TC_Transceiver* trans) { #if TARS_SSL - if(trans->isSSL()) { - assert(_pBindAdapter->_ctx); - return TC_OpenSSL::newSSL(_pBindAdapter->_ctx); - } - return NULL; + if(trans->isSSL()) { + assert(_pBindAdapter->_ctx); + return TC_OpenSSL::newSSL(_pBindAdapter->_ctx); +} +return NULL; #else - return NULL; + return NULL; #endif } void TC_EpollServer::Connection::onRequestCallback(TC_Transceiver *trans) { - while(!_messages.empty()) - { - auto it = _messages.begin(); + while(!_messages.empty()) + { + auto it = _messages.begin(); // LOG_CONSOLE_DEBUG << string((*it)->buffer()->buffer(), (*it)->buffer()->length()) << ", message:" << _messages.size() << endl; - TC_Transceiver::ReturnStatus iRet = _trans->sendRequest((*it)->buffer(), (*it)->getRecvContext()->addr()); + TC_Transceiver::ReturnStatus iRet = _trans->sendRequest((*it)->buffer(), (*it)->getRecvContext()->addr()); - if (iRet == TC_Transceiver::eRetError) - { - return; - } + if (iRet == TC_Transceiver::eRetError) + { + return; + } - if(iRet != TC_Transceiver::eRetNotSend) - { - _messageSize -= (*it)->buffer()->length(); + if(iRet != TC_Transceiver::eRetNotSend) + { + _messageSize -= (*it)->buffer()->length(); - _messages.erase(it); - } + _messages.erase(it); + } - //数据还不能发送 or 发送buffer已经满了 直接返回, 暂时不要再发送了! - if (iRet == TC_Transceiver::eRetNotSend || iRet == TC_Transceiver::eRetFull) - { - return; - } - } + //数据还不能发送 or 发送buffer已经满了 直接返回, 暂时不要再发送了! + if (iRet == TC_Transceiver::eRetNotSend || iRet == TC_Transceiver::eRetFull) + { + return; + } + } } TC_NetWorkBuffer::PACKET_TYPE TC_EpollServer::Connection::onParserCallback(TC_NetWorkBuffer& rbuf, TC_Transceiver *trans) { - if(rbuf.empty()) - { - return TC_NetWorkBuffer::PACKET_LESS; - } + if(rbuf.empty()) + { + return TC_NetWorkBuffer::PACKET_LESS; + } - //需要过滤首包包头 - if (_iHeaderLen > 0) { - if (rbuf.getBufferLength() >= (unsigned) _iHeaderLen) { - vector header; - rbuf.getHeader(_iHeaderLen, header); - _pBindAdapter->getHeaderFilterFunctor()(TC_NetWorkBuffer::PACKET_FULL, header); - rbuf.moveHeader(_iHeaderLen); - _iHeaderLen = 0; - } - else { - vector header = rbuf.getBuffers(); - _pBindAdapter->getHeaderFilterFunctor()(TC_NetWorkBuffer::PACKET_LESS, header); - _iHeaderLen -= (int) rbuf.getBufferLength(); - rbuf.clearBuffers(); - return TC_NetWorkBuffer::PACKET_LESS; - } - } + //需要过滤首包包头 + if (_iHeaderLen > 0) { + if (rbuf.getBufferLength() >= (unsigned) _iHeaderLen) { + vector header; + rbuf.getHeader(_iHeaderLen, header); + _pBindAdapter->getHeaderFilterFunctor()(TC_NetWorkBuffer::PACKET_FULL, header); + rbuf.moveHeader(_iHeaderLen); + _iHeaderLen = 0; + } + else { + vector header = rbuf.getBuffers(); + _pBindAdapter->getHeaderFilterFunctor()(TC_NetWorkBuffer::PACKET_LESS, header); + _iHeaderLen -= (int) rbuf.getBufferLength(); + rbuf.clearBuffers(); + return TC_NetWorkBuffer::PACKET_LESS; + } + } - rbuf.setConnection(this); + rbuf.setConnection(this); - vector ro; + vector ro; - TC_NetWorkBuffer::PACKET_TYPE ret = _pBindAdapter->getProtocol()(rbuf, ro); + TC_NetWorkBuffer::PACKET_TYPE ret = _pBindAdapter->getProtocol()(rbuf, ro); - if (ret == TC_NetWorkBuffer::PACKET_FULL) - { - auto recv = std::make_shared(_netThread->getIndex(), getId(), trans->getClientAddr(), getfd(), _pBindAdapter->shared_from_this()); + if (ret == TC_NetWorkBuffer::PACKET_FULL) + { + auto recv = std::make_shared(_netThread->getIndex(), getId(), trans->getClientAddr(), getfd(), _pBindAdapter->shared_from_this()); - recv->buffer().swap(ro); + recv->buffer().swap(ro); - //收到完整的包才算 - this->_bEmptyConn = false; + //收到完整的包才算 + this->_bEmptyConn = false; - _recv.push_back(recv); - //收到完整包 - // insertRecvQueue(recv); - } + _recv.push_back(recv); + //收到完整包 + // insertRecvQueue(recv); + } - return ret; + return ret; } void TC_EpollServer::Connection::onCompleteNetworkCallback(TC_Transceiver *trans) { - _pBindAdapter->insertRecvQueue(_recv); + _pBindAdapter->insertRecvQueue(_recv); - //收到完整包 - // insertRecvQueue(_recv); + //收到完整包 + // insertRecvQueue(_recv); - _recv.clear(); + _recv.clear(); } int TC_EpollServer::Connection::sendBufferDirect(const char* buff, size_t length) { - _pBindAdapter->increaseSendBufferSize(); + _pBindAdapter->increaseSendBufferSize(); - if(getBindAdapter()->getEndpoint().isTcp()) { + if(getBindAdapter()->getEndpoint().isTcp()) { - return _trans->sendRequest(std::make_shared(buff, length)); - } + return _trans->sendRequest(std::make_shared(buff, length)); + } - return 0; + return 0; } int TC_EpollServer::Connection::sendBufferDirect(const std::string& buff) { - return sendBufferDirect(buff.data(), buff.length()); + return sendBufferDirect(buff.data(), buff.length()); } int TC_EpollServer::Connection::sendBufferDirect(const shared_ptr& buff) { - _pBindAdapter->increaseSendBufferSize(); + _pBindAdapter->increaseSendBufferSize(); - if(getBindAdapter()->getEndpoint().isTcp()) { + if(getBindAdapter()->getEndpoint().isTcp()) { - return _trans->sendRequest(buff); - } + return _trans->sendRequest(buff); + } - return 0; + return 0; } int TC_EpollServer::Connection::checkFlow(TC_NetWorkBuffer& sendBuffer, size_t lastLeftBufferSize) @@ -869,175 +870,175 @@ int TC_EpollServer::Connection::checkFlow(TC_NetWorkBuffer& sendBuffer, size_t l // 每5秒检查一下积压情况, 连续12次(一分钟), 都是积压 // 且每个检查点, 积压长度都增加或者连续3次发送buffer字节小于1k, 就关闭连接, 主要避免极端情况 - //计算本次发送的大小: 上次没法送的数据大小 - 当前没发送的数据大小 - size_t nowSendBufferSize = lastLeftBufferSize - (_messageSize + sendBuffer.getBufferLength()); + //计算本次发送的大小: 上次没法送的数据大小 - 当前没发送的数据大小 + size_t nowSendBufferSize = lastLeftBufferSize - (_messageSize + sendBuffer.getBufferLength()); // 当出现队列积压的前提下, 且积压超过一定大小 // 每5秒检查一下积压情况, 连续12次(一分钟), 都是积压 // 且每个检查点, 积压长度都增加或者连续3次发送buffer字节小于1k, 就关闭连接, 主要避免极端情况 - size_t iBackPacketBuffLimit = _pBindAdapter->getBackPacketBuffLimit(); - if(iBackPacketBuffLimit > 0 && (_messageSize + sendBuffer.getBufferLength()) > iBackPacketBuffLimit) - { - if(_accumulateBufferSize == 0) - { - //开始积压 - _lastCheckTime = TNOW; - } - //累计本次发送数据(每5秒的数据累计到一起) - _accumulateBufferSize += nowSendBufferSize; - - if (TNOW - _lastCheckTime >= 5) - { - //如果持续有积压, 则每5秒检查一次 - _lastCheckTime = TNOW; - - //记录本次累计的数据<5秒内发送数据, 剩余未发送数据> - _checkSend.push_back(make_pair(_accumulateBufferSize, lastLeftBufferSize)); - - _accumulateBufferSize = 0; - - size_t iBackPacketBuffMin = _pBindAdapter->getBackPacketBuffMin(); - - //连续3个5秒, 发送速度都极慢, 每5秒发送 < iBackPacketBuffMin, 认为连接有问题, 关闭之 - int left = 3; - if ((int)_checkSend.size() >= left) - { - bool slow = true; - for (int i = (int)_checkSend.size() - 1; i >= (int)_checkSend.size() - left; i--) - { - //发送速度 - if (_checkSend[i].first > iBackPacketBuffMin) - { - slow = false; - continue; - } - } - - if (slow) - { - ostringstream os; - os << "send [" << _ip << ":" << _port << "] buffer queue send to slow, send size:"; - - for (int i = (int)_checkSend.size() - 1; i >= (int)(_checkSend.size() - left); i--) - { - os << ", " << _checkSend[i].first; - } - - _logger->error(os.str()); - sendBuffer.clearBuffers(); - return -5; - } - } - - //连续12个5秒, 都有积压现象, 检查 - if (_checkSend.size() >= 12) - { - bool accumulate = true; - for (size_t i = _checkSend.size() - 1; i >= 1; i--) { - //发送buffer 持续增加 - if (_checkSend[i].second < _checkSend[i - 1].second) { - accumulate = false; - break; - } - } - - //持续积压 - if (accumulate) - { - ostringstream os; - os << "send [" << _ip << ":" << _port << "] buffer queue continues to accumulate data, queue size:"; - - for (size_t i = 0; i < _checkSend.size(); i++) - { - os << ", " << _checkSend[i].second; - } - - _logger->error(os.str()); - sendBuffer.clearBuffers(); - return -4; - } - - _checkSend.erase(_checkSend.begin()); - } - } - } - else - { - //无积压 - _accumulateBufferSize = 0; - _lastCheckTime = TNOW; - _checkSend.clear(); - } - return 0; + size_t iBackPacketBuffLimit = _pBindAdapter->getBackPacketBuffLimit(); + if(iBackPacketBuffLimit > 0 && (_messageSize + sendBuffer.getBufferLength()) > iBackPacketBuffLimit) + { + if(_accumulateBufferSize == 0) + { + //开始积压 + _lastCheckTime = TNOW; + } + //累计本次发送数据(每5秒的数据累计到一起) + _accumulateBufferSize += nowSendBufferSize; + + if (TNOW - _lastCheckTime >= 5) + { + //如果持续有积压, 则每5秒检查一次 + _lastCheckTime = TNOW; + + //记录本次累计的数据<5秒内发送数据, 剩余未发送数据> + _checkSend.push_back(make_pair(_accumulateBufferSize, lastLeftBufferSize)); + + _accumulateBufferSize = 0; + + size_t iBackPacketBuffMin = _pBindAdapter->getBackPacketBuffMin(); + + //连续3个5秒, 发送速度都极慢, 每5秒发送 < iBackPacketBuffMin, 认为连接有问题, 关闭之 + int left = 3; + if ((int)_checkSend.size() >= left) + { + bool slow = true; + for (int i = (int)_checkSend.size() - 1; i >= (int)_checkSend.size() - left; i--) + { + //发送速度 + if (_checkSend[i].first > iBackPacketBuffMin) + { + slow = false; + continue; + } + } + + if (slow) + { + ostringstream os; + os << "send [" << _ip << ":" << _port << "] buffer queue send to slow, send size:"; + + for (int i = (int)_checkSend.size() - 1; i >= (int)(_checkSend.size() - left); i--) + { + os << ", " << _checkSend[i].first; + } + + _logger->error(os.str()); + sendBuffer.clearBuffers(); + return -5; + } + } + + //连续12个5秒, 都有积压现象, 检查 + if (_checkSend.size() >= 12) + { + bool accumulate = true; + for (size_t i = _checkSend.size() - 1; i >= 1; i--) { + //发送buffer 持续增加 + if (_checkSend[i].second < _checkSend[i - 1].second) { + accumulate = false; + break; + } + } + + //持续积压 + if (accumulate) + { + ostringstream os; + os << "send [" << _ip << ":" << _port << "] buffer queue continues to accumulate data, queue size:"; + + for (size_t i = 0; i < _checkSend.size(); i++) + { + os << ", " << _checkSend[i].second; + } + + _logger->error(os.str()); + sendBuffer.clearBuffers(); + return -4; + } + + _checkSend.erase(_checkSend.begin()); + } + } + } + else + { + //无积压 + _accumulateBufferSize = 0; + _lastCheckTime = TNOW; + _checkSend.clear(); + } + return 0; } int TC_EpollServer::Connection::sendBuffer() { - TC_NetWorkBuffer& sendBuffer = _trans->getSendBuffer(); + TC_NetWorkBuffer& sendBuffer = _trans->getSendBuffer(); - //计算还有多少数据没有发送出去 - size_t lastLeftBufferSize = _messageSize + sendBuffer.getBufferLength(); + //计算还有多少数据没有发送出去 + size_t lastLeftBufferSize = _messageSize + sendBuffer.getBufferLength(); // LOG_CONSOLE_DEBUG << "buff size:" << sendBuffer.getBufferLength() << ", " << sendBuffer.getBuffersString() << endl; - _trans->doRequest(); + _trans->doRequest(); - //需要关闭链接 - if (_bClose && _trans->getSendBuffer().empty()) - { - _logger->debug("send [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection by user."); - return -2; - } + //需要关闭链接 + if (_bClose && _trans->getSendBuffer().empty()) + { + _logger->debug("send [" + _ip + ":" + TC_Common::tostr(_port) + "] close connection by user."); + return -2; + } - return checkFlow(sendBuffer, lastLeftBufferSize); + return checkFlow(sendBuffer, lastLeftBufferSize); } int TC_EpollServer::Connection::send(const shared_ptr &sc) { - assert(sc); + assert(sc); - _pBindAdapter->increaseSendBufferSize(); + _pBindAdapter->increaseSendBufferSize(); - //队列为空, 直接发送, 发送失败进队列 - //队列不为空, 直接进队列 - const shared_ptr& buff = sc->buffer(); + //队列为空, 直接发送, 发送失败进队列 + //队列不为空, 直接进队列 + const shared_ptr& buff = sc->buffer(); // LOG_CONSOLE_DEBUG << string(buff->buffer(), buff->length()) << ", message:" << _messages.size() << endl; if(_messages.empty()) - { - _trans->sendRequest(buff, sc->getRecvContext()->addr()); - } + { + _trans->sendRequest(buff, sc->getRecvContext()->addr()); + } - //网络句柄无效了, 返回-1, 上层会关闭连接 - if(!_trans->isValid()) - { - return -1; - } + //网络句柄无效了, 返回-1, 上层会关闭连接 + if(!_trans->isValid()) + { + return -1; + } - //数据没有发送完 - if(!buff->empty()) - { - _messageSize += buff->length(); + //数据没有发送完 + if(!buff->empty()) + { + _messageSize += buff->length(); - _messages.push_back(sc); - } + _messages.push_back(sc); + } // LOG_CONSOLE_DEBUG << "buff size:" << buff->length() << ", message:" << _messages.size() << endl; auto cl = _connList.lock(); - if(cl) - { - cl->refresh(getId(), getTimeout() + TNOW); - } + if(cl) + { + cl->refresh(getId(), getTimeout() + TNOW); + } - return 0; + return 0; } void TC_EpollServer::Connection::setUdpRecvBuffer(size_t nSize) { - _trans->setUdpRecvBuffer(nSize); + _trans->setUdpRecvBuffer(nSize); } void TC_EpollServer::Connection::setUdpSendBuffer(size_t nSize) @@ -1047,49 +1048,49 @@ void TC_EpollServer::Connection::setUdpSendBuffer(size_t nSize) bool TC_EpollServer::Connection::setClose() { - _bClose = true; + _bClose = true; - return _trans->getSendBuffer().empty(); + return _trans->getSendBuffer().empty(); } //////////////////////////////////////////////////////////////// // TC_EpollServer::ConnectionList::ConnectionList(detail::LogInterface* logger) - : _emptyCheckTimeout(0) - , _logger(logger) - , _total(0) - , _free_size(0) - , _vConn(NULL) - , _lastTimeoutTime(0) - , _iConnectionMagic(0) + : _emptyCheckTimeout(0) + , _logger(logger) + , _total(0) + , _free_size(0) + , _vConn(NULL) + , _lastTimeoutTime(0) + , _iConnectionMagic(0) { } void TC_EpollServer::ConnectionList::init(uint32_t size, uint32_t iIndex) { - _lastTimeoutTime = TNOW; + _lastTimeoutTime = TNOW; - _total = size; + _total = size; - _free_size = 0; + _free_size = 0; - //初始化链接链表 - if (_vConn) delete[] _vConn; + //初始化链接链表 + if (_vConn) delete[] _vConn; - //分配total+1个空间(多分配一个空间, 第一个空间其实无效) - _vConn = new list_data[_total+1]; + //分配total+1个空间(多分配一个空间, 第一个空间其实无效) + _vConn = new list_data[_total+1]; - _iConnectionMagic = ((((uint32_t)TNOW) << 26) & (0xFFFFFFFF << 26)) + ((iIndex << 22) & (0xFFFFFFFF << 22));//((uint32_t)_lastTimeoutTime) << 20; + _iConnectionMagic = ((((uint32_t)TNOW) << 26) & (0xFFFFFFFF << 26)) + ((iIndex << 22) & (0xFFFFFFFF << 22));//((uint32_t)_lastTimeoutTime) << 20; - //free从1开始分配, 这个值为uid, 0保留为管道用, epollwait根据0判断是否是管道消息 - for(uint32_t i = 1; i <= _total; i++) - { - _vConn[i].first = NULL; + //free从1开始分配, 这个值为uid, 0保留为管道用, epollwait根据0判断是否是管道消息 + for(uint32_t i = 1; i <= _total; i++) + { + _vConn[i].first = NULL; - _free.push_back(i); + _free.push_back(i); - ++_free_size; - } + ++_free_size; + } // LOG_CONSOLE_DEBUG << this << ", " << "size:" << _free.size() << endl; @@ -1097,295 +1098,295 @@ void TC_EpollServer::ConnectionList::init(uint32_t size, uint32_t iIndex) void TC_EpollServer::ConnectionList::close() { - if(_vConn) - { - //服务停止时, 主动关闭一下连接, 这样客户端会检测到, 不需要等下一个发送包时, 发送失败才知道连接被关闭 - for (auto it = _tl.begin(); it != _tl.end(); ++it) { - if (_vConn[it->second].first != NULL) { - _vConn[it->second].first->close(); - delete _vConn[it->second].first; - _vConn[it->second].first = NULL; - } - } - _tl.clear(); - delete[] _vConn; - _vConn = NULL; - } + if(_vConn) + { + //服务停止时, 主动关闭一下连接, 这样客户端会检测到, 不需要等下一个发送包时, 发送失败才知道连接被关闭 + for (auto it = _tl.begin(); it != _tl.end(); ++it) { + if (_vConn[it->second].first != NULL) { + _vConn[it->second].first->close(); + delete _vConn[it->second].first; + _vConn[it->second].first = NULL; + } + } + _tl.clear(); + delete[] _vConn; + _vConn = NULL; + } } void TC_EpollServer::ConnectionList::closeConnections(weak_ptr bindAdapter) { - auto adapter = bindAdapter.lock(); + auto adapter = bindAdapter.lock(); - if(_vConn && adapter) - { - multimap tl = _tl; + if(_vConn && adapter) + { + multimap tl = _tl; - // LOG_CONSOLE_DEBUG << "list1 size:" << tl.size() << endl; + // LOG_CONSOLE_DEBUG << "list1 size:" << tl.size() << endl; - //服务停止时, 主动关闭一下连接, 这样客户端会检测到, 不需要等下一个发送包时, 发送失败才知道连接被关闭 - for (auto it = tl.begin(); it != tl.end(); ++it) - { - if (_vConn[it->second].first != NULL && _vConn[it->second].first->getBindAdapter() == adapter.get()) - { - this->delConnection(_vConn[it->second].first, true, TC_EpollServer::EM_SERVER_CLOSE); - } - } + //服务停止时, 主动关闭一下连接, 这样客户端会检测到, 不需要等下一个发送包时, 发送失败才知道连接被关闭 + for (auto it = tl.begin(); it != tl.end(); ++it) + { + if (_vConn[it->second].first != NULL && _vConn[it->second].first->getBindAdapter() == adapter.get()) + { + this->delConnection(_vConn[it->second].first, true, TC_EpollServer::EM_SERVER_CLOSE); + } + } - // LOG_CONSOLE_DEBUG << "list2 size:" << _tl.size() << endl; - } + // LOG_CONSOLE_DEBUG << "list2 size:" << _tl.size() << endl; + } } uint32_t TC_EpollServer::ConnectionList::getUniqId() { - TC_LockT lock(_mutex); + TC_LockT lock(_mutex); - assert(!_free.empty()); + assert(!_free.empty()); - uint32_t uid = _free.front(); + uint32_t uid = _free.front(); - assert(uid > 0 && uid <= _total); + assert(uid > 0 && uid <= _total); - _free.pop_front(); + _free.pop_front(); - --_free_size; + --_free_size; - return _iConnectionMagic | uid; + return _iConnectionMagic | uid; } TC_EpollServer::Connection* TC_EpollServer::ConnectionList::get(uint32_t uid) { - uint32_t magi = uid & (0xFFFFFFFF << 22); - uid = uid & (0x7FFFFFFF >> 9); + uint32_t magi = uid & (0xFFFFFFFF << 22); + uid = uid & (0x7FFFFFFF >> 9); - if (magi != _iConnectionMagic) return NULL; + if (magi != _iConnectionMagic) return NULL; - return _vConn[uid].first; + return _vConn[uid].first; } void TC_EpollServer::ConnectionList::add(Connection *cPtr, time_t iTimeOutStamp) { // LOG_CONSOLE_DEBUG << "add timeout:" << iTimeOutStamp << endl; - uint32_t muid = cPtr->getId(); - uint32_t magi = muid & (0xFFFFFFFF << 22); - uint32_t uid = muid & (0x7FFFFFFF >> 9); + uint32_t muid = cPtr->getId(); + uint32_t magi = muid & (0xFFFFFFFF << 22); + uint32_t uid = muid & (0x7FFFFFFF >> 9); - TC_LockT lock(_mutex); + TC_LockT lock(_mutex); - assert(magi == _iConnectionMagic && uid > 0 && uid <= _total && !_vConn[uid].first); + assert(magi == _iConnectionMagic && uid > 0 && uid <= _total && !_vConn[uid].first); - _vConn[uid] = make_pair(cPtr, _tl.insert(make_pair(iTimeOutStamp, uid))); + _vConn[uid] = make_pair(cPtr, _tl.insert(make_pair(iTimeOutStamp, uid))); } void TC_EpollServer::ConnectionList::refresh(uint32_t uid, time_t iTimeOutStamp) { - uint32_t magi = uid & (0xFFFFFFFF << 22); - uid = uid & (0x7FFFFFFF >> 9); + uint32_t magi = uid & (0xFFFFFFFF << 22); + uid = uid & (0x7FFFFFFF >> 9); - TC_LockT lock(_mutex); + TC_LockT lock(_mutex); - assert(magi == _iConnectionMagic && uid > 0 && uid <= _total && _vConn[uid].first); + assert(magi == _iConnectionMagic && uid > 0 && uid <= _total && _vConn[uid].first); - //至少一秒才刷新一次 - if(iTimeOutStamp - _vConn[uid].first->_iLastRefreshTime < 1) - { - return; - } + //至少一秒才刷新一次 + if(iTimeOutStamp - _vConn[uid].first->_iLastRefreshTime < 1) + { + return; + } - _vConn[uid].first->_iLastRefreshTime = iTimeOutStamp; + _vConn[uid].first->_iLastRefreshTime = iTimeOutStamp; - //删除超时链表 - _tl.erase(_vConn[uid].second); + //删除超时链表 + _tl.erase(_vConn[uid].second); - _vConn[uid].second = _tl.insert(make_pair(iTimeOutStamp, uid)); + _vConn[uid].second = _tl.insert(make_pair(iTimeOutStamp, uid)); } void TC_EpollServer::ConnectionList::delConnection(Connection *cPtr, bool bEraseList, EM_CLOSE_T closeType) { - assert(cPtr->isTcp()); + assert(cPtr->isTcp()); - BindAdapter* adapter = cPtr->getBindAdapter(); + BindAdapter* adapter = cPtr->getBindAdapter(); - //如果是TCP的连接才真正的关闭连接 - //false的情况,是超时被主动删除 - if(!bEraseList && _logger) - { - _logger->debug("timeout [" + cPtr->getIp() + ":" + TC_Common::tostr(cPtr->getPort()) + "] del from list"); - } + //如果是TCP的连接才真正的关闭连接 + //false的情况,是超时被主动删除 + if(!bEraseList && _logger) + { + _logger->debug("timeout [" + cPtr->getIp() + ":" + TC_Common::tostr(cPtr->getPort()) + "] del from list"); + } - uint32_t uid = cPtr->getId(); + uint32_t uid = cPtr->getId(); - //构造一个recv,通知业务该连接的关闭事件 - shared_ptr recv = std::make_shared(cPtr->getNetThread()->getIndex(), uid, cPtr->getTransceiver()->getClientAddr(), cPtr->getfd(), adapter->shared_from_this(), true, (int)closeType); + //构造一个recv,通知业务该连接的关闭事件 + shared_ptr recv = std::make_shared(cPtr->getNetThread()->getIndex(), uid, cPtr->getTransceiver()->getClientAddr(), cPtr->getfd(), adapter->shared_from_this(), true, (int)closeType); - adapter->insertRecvQueue(recv, true); + adapter->insertRecvQueue(recv, true); - //从epoller删除句柄放在close之前, 否则重用socket时会有问题 - cPtr->close(); + //从epoller删除句柄放在close之前, 否则重用socket时会有问题 + cPtr->close(); - //对于超时检查, 由于锁的原因, 在这里不从链表中删除 - if(bEraseList) - { - del(uid); - } + //对于超时检查, 由于锁的原因, 在这里不从链表中删除 + if(bEraseList) + { + del(uid); + } - adapter->decreaseNowConnection(); + adapter->decreaseNowConnection(); } void TC_EpollServer::ConnectionList::checkTimeout() { - time_t iCurTime = TNOW; + time_t iCurTime = TNOW; - //至少1s才能检查一次 - if(iCurTime - _lastTimeoutTime < 1) - { - return; - } + //至少1s才能检查一次 + if(iCurTime - _lastTimeoutTime < 1) + { + return; + } - _lastTimeoutTime = iCurTime; + _lastTimeoutTime = iCurTime; - TC_LockT lock(_mutex); + TC_LockT lock(_mutex); - multimap::iterator it = _tl.begin(); + multimap::iterator it = _tl.begin(); - while(it != _tl.end()) - { - //已经检查到当前时间点了, 后续不用在检查了 - if(it->first > iCurTime) - { - break; - } + while(it != _tl.end()) + { + //已经检查到当前时间点了, 后续不用在检查了 + if(it->first > iCurTime) + { + break; + } - uint32_t uid = it->second; + uint32_t uid = it->second; - ++it; + ++it; - //udp的监听端口, 不做处理 - if(_vConn[uid].first->isUdp()) - { - continue; - } + //udp的监听端口, 不做处理 + if(_vConn[uid].first->isUdp()) + { + continue; + } - //配置的超时间大于0, 才需要检查连接是否超时 - if(_vConn[uid].first->getTimeout() > 0) - { - //超时关闭 - delConnection(_vConn[uid].first, false, EM_SERVER_TIMEOUT_CLOSE); + //配置的超时间大于0, 才需要检查连接是否超时 + if(_vConn[uid].first->getTimeout() > 0) + { + //超时关闭 + delConnection(_vConn[uid].first, false, EM_SERVER_TIMEOUT_CLOSE); - //从链表中删除 - delNoLock(uid); - } - } + //从链表中删除 + delNoLock(uid); + } + } - if(_emptyCheckTimeout > 0) - { - it = _tl.begin(); - while(it != _tl.end()) - { - uint32_t uid = it->second; + if(_emptyCheckTimeout > 0) + { + it = _tl.begin(); + while(it != _tl.end()) + { + uint32_t uid = it->second; - //遍历所有的空连接 - if(_vConn[uid].first->isTcp() && _vConn[uid].first->isEmptyConn()) - { - //获取空连接的超时时间点 - time_t iEmptyTimeout = (it->first - _vConn[uid].first->getTimeout()) + _emptyCheckTimeout/1000; + //遍历所有的空连接 + if(_vConn[uid].first->isTcp() && _vConn[uid].first->isEmptyConn()) + { + //获取空连接的超时时间点 + time_t iEmptyTimeout = (it->first - _vConn[uid].first->getTimeout()) + _emptyCheckTimeout/1000; - //已经检查到当前时间点了, 后续不用在检查了 - if(iEmptyTimeout > iCurTime) - { - break; - } + //已经检查到当前时间点了, 后续不用在检查了 + if(iEmptyTimeout > iCurTime) + { + break; + } - //超时关闭 - delConnection(_vConn[uid].first, false, EM_SERVER_TIMEOUT_CLOSE); + //超时关闭 + delConnection(_vConn[uid].first, false, EM_SERVER_TIMEOUT_CLOSE); - //从链表中删除 - delNoLock(uid); - } + //从链表中删除 + delNoLock(uid); + } - ++it; - } - } + ++it; + } + } } vector TC_EpollServer::ConnectionList::getConnStatus(int lfd) { - vector v; + vector v; - TC_LockT lock(_mutex); + TC_LockT lock(_mutex); - for(size_t i = 1; i <= _total; i++) - { - //是当前监听端口的连接 - if(_vConn[i].first != NULL && _vConn[i].first->isTcp() && _vConn[i].first->getBindAdapter()->getSocket().getfd() == lfd) //getListenfd() == lfd) - { - ConnStatus cs; + for(size_t i = 1; i <= _total; i++) + { + //是当前监听端口的连接 + if(_vConn[i].first != NULL && _vConn[i].first->isTcp() && _vConn[i].first->getBindAdapter()->getSocket().getfd() == lfd) //getListenfd() == lfd) + { + ConnStatus cs; - cs.iLastRefreshTime = _vConn[i].first->_iLastRefreshTime; - cs.ip = _vConn[i].first->getIp(); - cs.port = _vConn[i].first->getPort(); - cs.timeout = _vConn[i].first->getTimeout(); - cs.uid = _vConn[i].first->getId(); - cs.recvBufferSize = _vConn[i].first->getRecvBuffer().getBufferLength(); - cs.sendBufferSize = _vConn[i].first->getSendBuffer().getBufferLength(); + cs.iLastRefreshTime = _vConn[i].first->_iLastRefreshTime; + cs.ip = _vConn[i].first->getIp(); + cs.port = _vConn[i].first->getPort(); + cs.timeout = _vConn[i].first->getTimeout(); + cs.uid = _vConn[i].first->getId(); + cs.recvBufferSize = _vConn[i].first->getRecvBuffer().getBufferLength(); + cs.sendBufferSize = _vConn[i].first->getSendBuffer().getBufferLength(); - v.push_back(cs); - } - } + v.push_back(cs); + } + } - return v; + return v; } void TC_EpollServer::ConnectionList::del(uint32_t uid) { - TC_LockT lock(_mutex); + TC_LockT lock(_mutex); - uint32_t magi = uid & (0xFFFFFFFF << 22); - uid = uid & (0x7FFFFFFF >> 9); + uint32_t magi = uid & (0xFFFFFFFF << 22); + uid = uid & (0x7FFFFFFF >> 9); - assert(magi == _iConnectionMagic && uid > 0 && uid <= _total && _vConn[uid].first); + assert(magi == _iConnectionMagic && uid > 0 && uid <= _total && _vConn[uid].first); - delNoLock(uid); + delNoLock(uid); } void TC_EpollServer::ConnectionList::delNoLock(uint32_t uid) { - assert(uid > 0 && uid <= _total && _vConn[uid].first); + assert(uid > 0 && uid <= _total && _vConn[uid].first); - _tl.erase(_vConn[uid].second); + _tl.erase(_vConn[uid].second); - Connection *conn = _vConn[uid].first; + Connection *conn = _vConn[uid].first; - _vConn[uid].first = NULL; + _vConn[uid].first = NULL; - delete conn; + delete conn; - _free.push_back(uid); + _free.push_back(uid); - ++_free_size; + ++_free_size; } /////////////////////////////BindAdapter/////////////////////////////////// TC_EpollServer::BindAdapter::BindAdapter(TC_EpollServer *epollServer) - : _pReportQueue(NULL) - , _pReportConRate(NULL) - , _pReportTimeoutNum(NULL) - , _epollServer(epollServer) - , _pf(echo_protocol) - , _hf(echo_header_filter) - , _name("") - , _iMaxConns(DEFAULT_MAX_CONN) - , _iCurConns(0) - , _iHandleNum(0) - , _eOrder(ALLOW_DENY) - , _iQueueCapacity(DEFAULT_QUEUE_CAP) - , _iQueueTimeout(DEFAULT_QUEUE_TIMEOUT) - , _iHeaderLen(0) - , _iHeartBeatTime(0) - , _protocolName("tars") + : _pReportQueue(NULL) + , _pReportConRate(NULL) + , _pReportTimeoutNum(NULL) + , _epollServer(epollServer) + , _pf(echo_protocol) + , _hf(echo_header_filter) + , _name("") + , _iMaxConns(DEFAULT_MAX_CONN) + , _iCurConns(0) + , _iHandleNum(0) + , _eOrder(ALLOW_DENY) + , _iQueueCapacity(DEFAULT_QUEUE_CAP) + , _iQueueTimeout(DEFAULT_QUEUE_TIMEOUT) + , _iHeaderLen(0) + , _iHeartBeatTime(0) + , _protocolName("tars") { } @@ -1397,337 +1398,337 @@ TC_EpollServer::BindAdapter::~BindAdapter() void TC_EpollServer::BindAdapter::bind() { - // try - // { + // try + // { - assert(!_s.isValid()); + assert(!_s.isValid()); #if TARGET_PLATFORM_WINDOWS - int type = _ep.isIPv6() ? AF_INET6 : AF_INET; + int type = _ep.isIPv6() ? AF_INET6 : AF_INET; #else - int type = _ep.isUnixLocal() ? AF_LOCAL : _ep.isIPv6() ? AF_INET6 : AF_INET; + int type = _ep.isUnixLocal() ? AF_LOCAL : _ep.isIPv6() ? AF_INET6 : AF_INET; #endif - int flag = 0; + int flag = 0; #if TARGET_PLATFORM_LINUX - flag = SOCK_CLOEXEC; + flag = SOCK_CLOEXEC; #endif - if (_ep.isTcp()) - { - _s.createSocket(SOCK_STREAM | flag, type); - } - else - { - _s.createSocket(SOCK_DGRAM | flag, type); - } + if (_ep.isTcp()) + { + _s.createSocket(SOCK_STREAM | flag, type); + } + else + { + _s.createSocket(SOCK_DGRAM | flag, type); + } #if TARGET_PLATFORM_WINDOWS - _s.bind(_ep.getHost(), _ep.getPort()); + _s.bind(_ep.getHost(), _ep.getPort()); #else - if (_ep.isUnixLocal()) - { - _s.bind(_ep.getHost().c_str()); - } - else - { - _s.bind(_ep.getHost(), _ep.getPort()); - } + if (_ep.isUnixLocal()) + { + _s.bind(_ep.getHost().c_str()); + } + else + { + _s.bind(_ep.getHost(), _ep.getPort()); + } #endif - if (_ep.isTcp()) - { - _s.listen(10240); + if (_ep.isTcp()) + { + _s.listen(10240); - try - { - //不要设置close wait否则http服务回包主动关闭连接会有问题 - _s.setNoCloseWait(); - _s.setKeepAlive(); - _s.setTcpNoDelay(); - } - catch(exception &ex) - { - } - } - _s.setblock(false); - // } - // catch(exception &ex) - // { - // _s.close(); - // cerr << "bind:" << _ep.toString() << " error:" << ex.what() << endl; - // throw ex; - // } + try + { + //不要设置close wait否则http服务回包主动关闭连接会有问题 + _s.setNoCloseWait(); + _s.setKeepAlive(); + _s.setTcpNoDelay(); + } + catch(exception &ex) + { + } + } + _s.setblock(false); + // } + // catch(exception &ex) + // { + // _s.close(); + // cerr << "bind:" << _ep.toString() << " error:" << ex.what() << endl; + // throw ex; + // } } void TC_EpollServer::BindAdapter::setNetThreads(const vector &netThreads) { - _netThreads = netThreads; + _netThreads = netThreads; - for(auto netThread : _netThreads) - { - netThread->addAdapter(this); - } + for(auto netThread : _netThreads) + { + netThread->addAdapter(this); + } } void TC_EpollServer::BindAdapter::initUdp(NetThread* netThread) { - //对于udp, 网络线程监听所有监听的udp端口 - if(this->_ep.isUdp()) - { - Connection *cPtr = new Connection(netThread->getConnectionList(), this, _s.getfd(), _epollServer); + //对于udp, 网络线程监听所有监听的udp端口 + if(this->_ep.isUdp()) + { + Connection *cPtr = new Connection(netThread->getConnectionList(), this, _s.getfd(), _epollServer); - netThread->addUdpConnection(cPtr); + netThread->addUdpConnection(cPtr); - } + } } void TC_EpollServer::BindAdapter::setProtocolName(const string &name) { - std::lock_guard lock (_mutex); + std::lock_guard lock (_mutex); - _protocolName = name; + _protocolName = name; } const string &TC_EpollServer::BindAdapter::getProtocolName() { - std::lock_guard lock (_mutex); + std::lock_guard lock (_mutex); - return _protocolName; + return _protocolName; } bool TC_EpollServer::BindAdapter::isTarsProtocol() { - return (_protocolName == "tars" || _protocolName == "tars"); + return (_protocolName == "tars" || _protocolName == "tars"); } bool TC_EpollServer::BindAdapter::isIpAllow(const string &ip) const { - std::lock_guard lock (_mutex); - - if (_eOrder == ALLOW_DENY) - { - if (TC_Common::matchPeriod(ip, _vtAllow)) - { - return true; - } - if (TC_Common::matchPeriod(ip, _vtDeny)) - { - return false; - } - } - else - { - if (TC_Common::matchPeriod(ip, _vtDeny)) - { - return false; - } - if (TC_Common::matchPeriod(ip, _vtAllow)) - { - return true; - } - } - return _vtAllow.size() == 0; + std::lock_guard lock (_mutex); + + if (_eOrder == ALLOW_DENY) + { + if (TC_Common::matchPeriod(ip, _vtAllow)) + { + return true; + } + if (TC_Common::matchPeriod(ip, _vtDeny)) + { + return false; + } + } + else + { + if (TC_Common::matchPeriod(ip, _vtDeny)) + { + return false; + } + if (TC_Common::matchPeriod(ip, _vtAllow)) + { + return true; + } + } + return _vtAllow.size() == 0; } void TC_EpollServer::BindAdapter::enableManualListen() { - _manualListen = true; + _manualListen = true; } void TC_EpollServer::BindAdapter::manualListen() { - if(!this->getSocket().isValid() && !_epollServer->isTerminate()) - { - weak_ptr weakPtr = shared_from_this(); + if(!this->getSocket().isValid() && !_epollServer->isTerminate()) + { + weak_ptr weakPtr = shared_from_this(); - auto func = std::bind(&TC_EpollServer::listenCallback, _epollServer, weakPtr); + auto func = std::bind(&TC_EpollServer::listenCallback, _epollServer, weakPtr); - _epollServer->getEpoller()->asyncCallback(func); - } + _epollServer->getEpoller()->asyncCallback(func); + } } void TC_EpollServer::BindAdapter::cancelListen() { - if(this->getSocket().isValid() && !_epollServer->isTerminate()) - { - weak_ptr weakPtr = shared_from_this(); + if(this->getSocket().isValid() && !_epollServer->isTerminate()) + { + weak_ptr weakPtr = shared_from_this(); - auto func = std::bind(&TC_EpollServer::listenCallback, _epollServer, weakPtr); + auto func = std::bind(&TC_EpollServer::listenCallback, _epollServer, weakPtr); - _epollServer->getEpoller()->asyncCallback(func); - } + _epollServer->getEpoller()->asyncCallback(func); + } } void TC_EpollServer::BindAdapter::insertRecvQueue(const shared_ptr &recv, bool force) { - int iRet = isOverloadorDiscard(); + int iRet = isOverloadorDiscard(); - if (iRet == 0 || force) //未过载 - { - _dataBuffer->insertRecvQueue(recv); - } - else if (iRet == -1) //超过队列长度4/5,需要进行overload处理 - { - recv->setOverload(); + if (iRet == 0 || force) //未过载 + { + _dataBuffer->insertRecvQueue(recv); + } + else if (iRet == -1) //超过队列长度4/5,需要进行overload处理 + { + recv->setOverload(); - _dataBuffer->insertRecvQueue(recv); - } - else //接受队列满,需要丢弃 - { - _epollServer->error("[BindAdapter::insertRecvQueue] overload discard package"); - } + _dataBuffer->insertRecvQueue(recv); + } + else //接受队列满,需要丢弃 + { + _epollServer->error("[BindAdapter::insertRecvQueue] overload discard package"); + } } void TC_EpollServer::BindAdapter::insertRecvQueue(const deque> &recv) { - int iRet = isOverloadorDiscard(); + int iRet = isOverloadorDiscard(); - if (iRet == 0) //未过载 - { - _dataBuffer->insertRecvQueue(recv); - } - else if (iRet == -1) //超过队列长度4/5,需要进行overload处理 - { - for(auto r : recv) - { - r->setOverload(); - } + if (iRet == 0) //未过载 + { + _dataBuffer->insertRecvQueue(recv); + } + else if (iRet == -1) //超过队列长度4/5,需要进行overload处理 + { + for(auto r : recv) + { + r->setOverload(); + } - _dataBuffer->insertRecvQueue(recv); - } - else //接受队列满,需要丢弃 - { - _epollServer->error("[BindAdapter::insertRecvQueue] overload discard package"); - } + _dataBuffer->insertRecvQueue(recv); + } + else //接受队列满,需要丢弃 + { + _epollServer->error("[BindAdapter::insertRecvQueue] overload discard package"); + } } TC_NetWorkBuffer::PACKET_TYPE TC_EpollServer::BindAdapter::echo_protocol(TC_NetWorkBuffer &r, vector &o) { - o = r.getBuffers(); + o = r.getBuffers(); - r.clearBuffers(); + r.clearBuffers(); - return TC_NetWorkBuffer::PACKET_FULL; + return TC_NetWorkBuffer::PACKET_FULL; } TC_NetWorkBuffer::PACKET_TYPE TC_EpollServer::BindAdapter::echo_header_filter(TC_NetWorkBuffer::PACKET_TYPE i, vector &o) { - return TC_NetWorkBuffer::PACKET_FULL; + return TC_NetWorkBuffer::PACKET_FULL; } int TC_EpollServer::BindAdapter::isOverloadorDiscard() { - int iRecvBufferSize = _dataBuffer->getRecvBufferSize(); + int iRecvBufferSize = _dataBuffer->getRecvBufferSize(); - if(iRecvBufferSize > (int)(_iQueueCapacity / 5.*4) && (iRecvBufferSize < _iQueueCapacity) && (_iQueueCapacity > 0)) //overload - { - //超过队列4/5开始认为过载 - return -1; - } - else if(iRecvBufferSize > (int)(_iQueueCapacity) && _iQueueCapacity > 0 ) //队列满需要丢弃接受的数据包 - { - return -2; - } - return 0; + if(iRecvBufferSize > (int)(_iQueueCapacity / 5.*4) && (iRecvBufferSize < _iQueueCapacity) && (_iQueueCapacity > 0)) //overload + { + //超过队列4/5开始认为过载 + return -1; + } + else if(iRecvBufferSize > (int)(_iQueueCapacity) && _iQueueCapacity > 0 ) //队列满需要丢弃接受的数据包 + { + return -2; + } + return 0; } void TC_EpollServer::BindAdapter::setQueueTimeout(int t) { - if (t >= MIN_QUEUE_TIMEOUT) - { - _iQueueTimeout = t; - } - else - { - _iQueueTimeout = MIN_QUEUE_TIMEOUT; - } + if (t >= MIN_QUEUE_TIMEOUT) + { + _iQueueTimeout = t; + } + else + { + _iQueueTimeout = MIN_QUEUE_TIMEOUT; + } } void TC_EpollServer::BindAdapter::setProtocol(const TC_NetWorkBuffer::protocol_functor &pf, int iHeaderLen, const TC_EpollServer::header_filter_functor &hf) { - _pf = pf; + _pf = pf; - _hf = hf; + _hf = hf; - _iHeaderLen = iHeaderLen; + _iHeaderLen = iHeaderLen; } //////////////////////////////NetThread////////////////////////////////// TC_EpollServer::NetThread::NetThread(int threadIndex, TC_EpollServer *epollServer) - : _epoller(NULL) - , _threadIndex(threadIndex) - , _epollServer(epollServer) + : _epoller(NULL) + , _threadIndex(threadIndex) + , _epollServer(epollServer) // , _nUdpRecvBufferSize(DEFAULT_RECV_BUFFERSIZE) { - _list = std::make_shared(_epollServer); + _list = std::make_shared(_epollServer); } TC_EpollServer::NetThread::~NetThread() { - _list = NULL; + _list = NULL; } bool TC_EpollServer::NetThread::isReady() const { - return _scheduler && _scheduler->isReady(); + return _scheduler && _scheduler->isReady(); } void TC_EpollServer::NetThread::createEpoll(uint32_t maxAllConn) { - _list->init((uint32_t) maxAllConn, _threadIndex + 1); + _list->init((uint32_t) maxAllConn, _threadIndex + 1); } void TC_EpollServer::NetThread::delConnection(TC_EpollServer::Connection *cPtr, bool bEraseList, TC_EpollServer::EM_CLOSE_T closeType) { - if (cPtr->isTcp()) - { - _list->delConnection(cPtr, bEraseList, closeType); - } + if (cPtr->isTcp()) + { + _list->delConnection(cPtr, bEraseList, closeType); + } } void TC_EpollServer::NetThread::terminate() { - assert(_scheduler); + assert(_scheduler); - _scheduler->terminate(); + _scheduler->terminate(); } void TC_EpollServer::NetThread::notifyCloseConnectionList(const shared_ptr &adapter) { - weak_ptr wAdapter = adapter; + weak_ptr wAdapter = adapter; - auto func = std::bind(&TC_EpollServer::ConnectionList::closeConnections, _list, adapter); + auto func = std::bind(&TC_EpollServer::ConnectionList::closeConnections, _list, adapter); - _epoller->syncCallback(func); + _epoller->syncCallback(func); } void TC_EpollServer::NetThread::addTcpConnection(TC_EpollServer::Connection *cPtr) { - uint32_t uid = _list->getUniqId(); + uint32_t uid = _list->getUniqId(); // LOG_CONSOLE_DEBUG << "uid:" << uid << ", " << this->_threadIndex << endl; - cPtr->initialize(_epoller, uid, this); + cPtr->initialize(_epoller, uid, this); - _list->add(cPtr, cPtr->getTimeout() + TNOW); + _list->add(cPtr, cPtr->getTimeout() + TNOW); - cPtr->getBindAdapter()->increaseNowConnection(); + cPtr->getBindAdapter()->increaseNowConnection(); - //注意epoll add必须放在最后, 否则可能导致执行完, 才调用上面语句 - cPtr->registerEvent(this); + //注意epoll add必须放在最后, 否则可能导致执行完, 才调用上面语句 + cPtr->registerEvent(this); } void TC_EpollServer::NetThread::addUdpConnection(TC_EpollServer::Connection *cPtr) { - assert(_epoller != NULL); + assert(_epoller != NULL); - uint32_t uid = _list->getUniqId(); + uint32_t uid = _list->getUniqId(); - // LOG_CONSOLE_DEBUG << "uid:" << uid << endl; + // LOG_CONSOLE_DEBUG << "uid:" << uid << endl; - cPtr->initialize(_epoller, uid, this); + cPtr->initialize(_epoller, uid, this); //udp分配接收buffer cPtr->setUdpRecvBuffer(_epollServer->getUdpRecvBufferSize()); @@ -1735,174 +1736,178 @@ void TC_EpollServer::NetThread::addUdpConnection(TC_EpollServer::Connection *cPt _list->add(cPtr, cPtr->getTimeout() + TNOW); - cPtr->registerEvent(this); + cPtr->registerEvent(this); } void TC_EpollServer::NetThread::close(const shared_ptr & data) { - if(this->_epollServer->isTerminate()) - { - return; - } + if(this->_epollServer->isTerminate()) + { + return; + } - shared_ptr send = data->createCloseContext(); + shared_ptr send = data->createCloseContext(); - _sbuffer.push_back(send); + _sbuffer.push_back(send); // 通知epoll响应, 关闭连接 - notify(); + notify(); } void TC_EpollServer::NetThread::send(const shared_ptr & data) { - if(this->_epollServer->isTerminate()) - { - return; - } + if(this->_epollServer->isTerminate()) + { + return; + } - if(!_list) - { - //已经析构了! - return; - } + if(!_list) + { + //已经析构了! + return; + } - if (_threadId == TC_Thread::CURRENT_THREADID()) - { - //发送包线程和网络线程是同一个线程,直接发送即可 - Connection *cPtr = getConnectionPtr(data->uid()); + if (_threadId == TC_Thread::CURRENT_THREADID()) + { + //发送包线程和网络线程是同一个线程,直接发送即可 + Connection *cPtr = getConnectionPtr(data->uid()); - if (cPtr) - { - cPtr->send(data); - } - } - else - { - //发送包线程和网络线程不是同一个线程, 需要先放队列, 再唤醒网络线程去发送 - _sbuffer.push_back(data); + if (cPtr) + { + cPtr->send(data); + } + } + else + { + //发送包线程和网络线程不是同一个线程, 需要先放队列, 再唤醒网络线程去发送 + _sbuffer.push_back(data); - notify(); - } + notify(); + } } void TC_EpollServer::NetThread::processPipe() { - // LOG_CONSOLE("processPipe"); - - while (!_sbuffer.empty()) - { - shared_ptr sc = _sbuffer.front(); - - _sbuffer.pop_front(); - - Connection *cPtr = getConnectionPtr(sc->uid()); - - if (!cPtr) - { - continue; - } - - switch (sc->cmd()) - { - case 'c': - { - if (cPtr->setClose()) - { - delConnection(cPtr, true, EM_SERVER_CLOSE); - } - break; - } - case 's': - { - int ret = cPtr->send(sc); - if (ret < 0) - { - delConnection(cPtr, true, (ret == -1) ? EM_CLIENT_CLOSE : EM_SERVER_CLOSE); - } - break; - } - default: - assert(false); - } - } + // LOG_CONSOLE("processPipe"); + + while (!_sbuffer.empty()) + { + shared_ptr sc = _sbuffer.front(); + + _sbuffer.pop_front(); + + Connection *cPtr = getConnectionPtr(sc->uid()); + + if (!cPtr) + { + continue; + } + + switch (sc->cmd()) + { + case 'c': + { + if (cPtr->setClose()) + { + delConnection(cPtr, true, EM_SERVER_CLOSE); + } + break; + } + case 's': + { + int ret = cPtr->send(sc); + if (ret < 0) + { + delConnection(cPtr, true, (ret == -1) ? EM_CLIENT_CLOSE : EM_SERVER_CLOSE); + } + break; + } + default: + assert(false); + } + } } void TC_EpollServer::NetThread::setInitializeHandle(std::function initialize, std::function handle) { - _initialize = initialize; - _handle = handle; + _initialize = initialize; + _handle = handle; } void TC_EpollServer::NetThread::run() { - try - { - if(_epollServer->getOpenCoroutine() == NET_THREAD_QUEUE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == NET_THREAD_MERGE_HANDLES_THREAD) - { - //线程模型, 此时调度器是local的, 线程里面无法感知到自己处于协程中 - _scheduler = std::make_shared(); - _scheduler->setPoolStackSize(10, 128*1024); + try + { + if(_epollServer->getOpenCoroutine() == NET_THREAD_QUEUE_HANDLES_THREAD || _epollServer->getOpenCoroutine() == NET_THREAD_MERGE_HANDLES_THREAD) + { + //线程模型, 此时调度器是local的, 线程里面无法感知到自己处于协程中 + _scheduler = std::make_shared(); + _scheduler->setPoolStackSize(10, 128*1024); - } - else - { - _scheduler = TC_CoroutineScheduler::create(); + } + else + { + _scheduler = TC_CoroutineScheduler::create(); - _scheduler->setPoolStackSize(_epollServer->getCoroutinePoolSize(), _epollServer->getCoroutineStackSize()); - } + _scheduler->setPoolStackSize(_epollServer->getCoroutinePoolSize(), _epollServer->getCoroutineStackSize()); + } - _epoller = _scheduler->getEpoller(); - _epoller->setName("net-thread"); + _epoller = _scheduler->getEpoller(); + _epoller->setName("net-thread"); - assert(_epoller); + assert(_epoller); - _threadId = TC_Thread::CURRENT_THREADID(); + _threadId = TC_Thread::CURRENT_THREADID(); - //对于udp, 网络线程监听所有监听的udp端口, 竞争收取数据 - for(auto adapter : _adapters) - { - adapter->initUdp(this); - } + //对于udp, 网络线程监听所有监听的udp端口, 竞争收取数据 + for(auto adapter : _adapters) + { + adapter->initUdp(this); + } - _epoller->postRepeated(2000, false, std::bind(&ConnectionList::checkTimeout, _list)); + _epoller->postRepeated(2000, false, std::bind(&ConnectionList::checkTimeout, _list)); - if(_initialize) - { - _initialize(); - } + if(_initialize) + { + _initialize(); + } - if(_handle) - { - _epoller->idle(_handle); - } + if(_handle) + { + _epoller->idle(_handle); + } - _epoller->idle(std::bind(&NetThread::processPipe, this)); + _epoller->idle(std::bind(&NetThread::processPipe, this)); - _epollServer->notifyThreadReady(); + _epollServer->notifyThreadReady(); - _scheduler->run(); + _scheduler->run(); - _list->close(); + _list->close(); - } - catch(const std::exception& ex) - { - _epollServer->error(ex.what()); - } - catch(...) - { - _epollServer->error("NetThread::run error"); - } + if(_epollServer->getOpenCoroutine() != NET_THREAD_QUEUE_HANDLES_THREAD && _epollServer->getOpenCoroutine() != NET_THREAD_MERGE_HANDLES_THREAD) + { + TC_CoroutineScheduler::reset(); + } + } + catch(const std::exception& ex) + { + _epollServer->error(ex.what()); + } + catch(...) + { + _epollServer->error("NetThread::run error"); + } } ///////////////////////////////////////////////////////////////////////////////////////////////////////// TC_EpollServer::TC_EpollServer(unsigned int iNetThreadNum) -: _threadNum(iNetThreadNum) -, _pLocalLogger(NULL) -, _acceptFunc(NULL) + : _threadNum(iNetThreadNum) + , _pLocalLogger(NULL) + , _acceptFunc(NULL) { #if TARGET_PLATFORM_WINDOWS - WSADATA wsadata; + WSADATA wsadata; WSAStartup(MAKEWORD(2, 2), &wsadata); #endif // @@ -1912,10 +1917,10 @@ WSAStartup(MAKEWORD(2, 2), &wsadata); TC_EpollServer::~TC_EpollServer() { - terminate(); + terminate(); #if TARGET_PLATFORM_WINDOWS - WSACleanup(); + WSACleanup(); #endif } @@ -1925,62 +1930,62 @@ void TC_EpollServer::applicationCallback(TC_EpollServer *epollServer) bool TC_EpollServer::accept(int fd, int domain) { - struct sockaddr_in stSockAddr4; - struct ::sockaddr_in6 stSockAddr6; + struct sockaddr_in stSockAddr4; + struct ::sockaddr_in6 stSockAddr6; - socklen_t iSockAddrSize = (AF_INET6 == domain) ? sizeof(::sockaddr_in6) : sizeof(sockaddr_in); - struct sockaddr *stSockAddr = (AF_INET6 == domain) ? (struct sockaddr *) &stSockAddr6 : (struct sockaddr *) &stSockAddr4; + socklen_t iSockAddrSize = (AF_INET6 == domain) ? sizeof(::sockaddr_in6) : sizeof(sockaddr_in); + struct sockaddr *stSockAddr = (AF_INET6 == domain) ? (struct sockaddr *) &stSockAddr6 : (struct sockaddr *) &stSockAddr4; - TC_Socket cs; + TC_Socket cs; - cs.setOwner(false); + cs.setOwner(false); - //接收连接 - TC_Socket s; + //接收连接 + TC_Socket s; - s.init(fd, false, domain); - int iRetCode = s.accept(cs, (struct sockaddr *) stSockAddr, iSockAddrSize); + s.init(fd, false, domain); + int iRetCode = s.accept(cs, (struct sockaddr *) stSockAddr, iSockAddrSize); - if (iRetCode > 0) - { - string ip; - uint16_t port; + if (iRetCode > 0) + { + string ip; + uint16_t port; - char sAddr[INET6_ADDRSTRLEN] = "\0"; + char sAddr[INET6_ADDRSTRLEN] = "\0"; - inet_ntop(domain, - (AF_INET6 == domain) ? (void *) &stSockAddr6.sin6_addr : (void *) &stSockAddr4.sin_addr, - sAddr, - sizeof(sAddr)); + inet_ntop(domain, + (AF_INET6 == domain) ? (void *) &stSockAddr6.sin6_addr : (void *) &stSockAddr4.sin_addr, + sAddr, + sizeof(sAddr)); - port = (AF_INET6 == domain) ? ntohs(stSockAddr6.sin6_port) : ntohs(stSockAddr4.sin_port); - ip = sAddr; + port = (AF_INET6 == domain) ? ntohs(stSockAddr6.sin6_port) : ntohs(stSockAddr4.sin_port); + ip = sAddr; - debug("accept [" + ip + ":" + TC_Common::tostr(port) + "] [" + TC_Common::tostr(cs.getfd()) + "] incomming"); + debug("accept [" + ip + ":" + TC_Common::tostr(port) + "] [" + TC_Common::tostr(cs.getfd()) + "] incomming"); - const BindAdapterPtr &adapter = _listeners[fd]; + const BindAdapterPtr &adapter = _listeners[fd]; - if (!adapter->isIpAllow(ip)) - { - debug("accept [" + ip + ":" + TC_Common::tostr(port) + "] [" + TC_Common::tostr(cs.getfd()) + "] not allowed"); + if (!adapter->isIpAllow(ip)) + { + debug("accept [" + ip + ":" + TC_Common::tostr(port) + "] [" + TC_Common::tostr(cs.getfd()) + "] not allowed"); - cs.close(); + cs.close(); - return true; - } + return true; + } - if (adapter->isLimitMaxConnection()) - { - error("accept [" + ip + ":" + TC_Common::tostr(port) + "][" + TC_Common::tostr(cs.getfd()) - + "] beyond max connection:" + TC_Common::tostr(_listeners[fd]->getMaxConns())); - cs.close(); + if (adapter->isLimitMaxConnection()) + { + error("accept [" + ip + ":" + TC_Common::tostr(port) + "][" + TC_Common::tostr(cs.getfd()) + + "] beyond max connection:" + TC_Common::tostr(_listeners[fd]->getMaxConns())); + cs.close(); - return true; - } + return true; + } // cou<< "fd:" << fd << ", cfd:" << cs.getfd() << endl; - cs.setblock(false); + cs.setblock(false); try { @@ -1993,623 +1998,634 @@ bool TC_EpollServer::accept(int fd, int domain) error("accept [" + ip + ":" + TC_Common::tostr(port) + "] set keep alive error:" + string(ex.what())); } - const std::vector &netThreads = adapter->getNetThreads(); + const std::vector &netThreads = adapter->getNetThreads(); - NetThread *netThread = netThreads[cs.getfd() % netThreads.size()]; + NetThread *netThread = netThreads[cs.getfd() % netThreads.size()]; - Connection *cPtr = new Connection(netThread->getConnectionList(), adapter.get(), cs.getfd(), ip, port, this); + Connection *cPtr = new Connection(netThread->getConnectionList(), adapter.get(), cs.getfd(), ip, port, this); - //过滤连接首个数据包包头 - cPtr->setHeaderFilterLen((int) adapter->getHeaderFilterLen()); + //过滤连接首个数据包包头 + cPtr->setHeaderFilterLen((int) adapter->getHeaderFilterLen()); - netThread->addTcpConnection(cPtr); + netThread->addTcpConnection(cPtr); - return true; - } - // else - // { - // //直到发生EAGAIN才不继续accept - // if (TC_Socket::isPending()) - // { - // return false; - // } - // } - // return true; - return false; + return true; + } + // else + // { + // //直到发生EAGAIN才不继续accept + // if (TC_Socket::isPending()) + // { + // return false; + // } + // } + // return true; + return false; } bool TC_EpollServer::acceptCallback(const shared_ptr &info, weak_ptr adapterPtr) { - auto adapter = adapterPtr.lock(); + auto adapter = adapterPtr.lock(); - if(!adapter) - { - return false; - } + if(!adapter) + { + return false; + } #if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS - bool ret; + bool ret; do { ret = accept(info->fd(), adapter->getEndpoint().isIPv6() ? AF_INET6 : AF_INET); - } while (ret); + } while (ret); #else - accept(info->fd(), adapter->_ep.isIPv6() ? AF_INET6 : AF_INET); + accept(info->fd(), adapter->_ep.isIPv6() ? AF_INET6 : AF_INET); #endif - return true; + return true; } void TC_EpollServer::listenCallback(weak_ptr adapterPtr) { // LOG_CONSOLE_DEBUG << endl; - auto adapter = adapterPtr.lock(); + auto adapter = adapterPtr.lock(); - if(!adapter) - { - return ; - } + if(!adapter) + { + return ; + } - if(!adapter->getSocket().isValid()) - { - //socket还没有开启, 则表示需要绑定 - adapter->bind(); + if(!adapter->getSocket().isValid()) + { + //socket还没有开启, 则表示需要绑定 + adapter->bind(); - int fd = adapter->getSocket().getfd(); + int fd = adapter->getSocket().getfd(); - _listeners[fd] = adapter; + _listeners[fd] = adapter; - // assert(!adapter->getEpollInfo()); + // assert(!adapter->getEpollInfo()); - auto info = _epoller->createEpollInfo(fd); + auto info = _epoller->createEpollInfo(fd); - adapter->setEpollInfo(info); + adapter->setEpollInfo(info); - map callbacks; + map callbacks; - callbacks[EPOLLIN] = std::bind(&TC_EpollServer::acceptCallback, this, std::placeholders::_1, adapter); + callbacks[EPOLLIN] = std::bind(&TC_EpollServer::acceptCallback, this, std::placeholders::_1, adapter); - adapter->getEpollInfo()->registerCallback(callbacks, EPOLLIN); + adapter->getEpollInfo()->registerCallback(callbacks, EPOLLIN); - this->info("bind adatper:" + adapter->getName()); + this->info("bind adatper:" + adapter->getName()); - } - else - { - //已经绑定中的, 则取消绑定 - int bindFd = adapter->getSocket().getfd(); + } + else + { + //已经绑定中的, 则取消绑定 + int bindFd = adapter->getSocket().getfd(); - _listeners.erase(bindFd); + _listeners.erase(bindFd); - adapter->getSocket().close(); + adapter->getSocket().close(); - //通知线程关闭和BindAdapter相关的连接 - for(auto nt : this->_netThreads) - { - nt->notifyCloseConnectionList(adapter); - } + //通知线程关闭和BindAdapter相关的连接 + for(auto nt : this->_netThreads) + { + nt->notifyCloseConnectionList(adapter); + } - _epoller->releaseEpollInfo(adapter->getEpollInfo()); + _epoller->releaseEpollInfo(adapter->getEpollInfo()); - adapter->getEpollInfo().reset(); + adapter->getEpollInfo().reset(); - info("un bind adatper:" + adapter->getName()); - } + info("un bind adatper:" + adapter->getName()); + } } void TC_EpollServer::waitForShutdown() { - _epoller = new TC_Epoller(); - _epoller->create(10240); - _epoller->setName("epollserver-epoller"); + _epoller = new TC_Epoller(); + _epoller->create(10240); + _epoller->setName("epollserver-epoller"); - _readyThreadNum = 0; + _readyThreadNum = 0; - initHandle(); + initHandle(); - createEpoll(); + createEpoll(); - startHandle(); + startHandle(); - if(_hf) - { - _epoller->postRepeated(5000, false, [&](){ _hf(this);}); - } + if(_hf) + { + _epoller->postRepeated(5000, false, [&](){ _hf(this);}); + } - for(auto it : _bindAdapters) - { - if(it->getEndpoint().isTcp()) - { - if(it->getSocket().isValid()) - { - //socket有效, 说明已经绑定了, 注册accept回调 - shared_ptr info = _epoller->createEpollInfo(it->getSocket().getfd()); + for(auto it : _bindAdapters) + { + if(it->getEndpoint().isTcp()) + { + if(it->getSocket().isValid()) + { + //socket有效, 说明已经绑定了, 注册accept回调 + shared_ptr info = _epoller->createEpollInfo(it->getSocket().getfd()); - it->setEpollInfo(info); + it->setEpollInfo(info); - map callbacks; + map callbacks; - callbacks[EPOLLIN] = std::bind(&TC_EpollServer::acceptCallback, this, std::placeholders::_1, it); + callbacks[EPOLLIN] = std::bind(&TC_EpollServer::acceptCallback, this, std::placeholders::_1, it); - info->registerCallback(callbacks, EPOLLIN); - } - } - } + info->registerCallback(callbacks, EPOLLIN); + } + } + } - //等待协程调度器都启动, 否则有网络连接上时会出问题! - waitForReady(); + //等待协程调度器都启动, 否则有网络连接上时会出问题! + waitForReady(); - _epoller->loop(10000); + _epoller->loop(10000); - _epoller->clear(); + _epoller->clear(); - //停掉处理线程 - stopThread(); + //停掉处理线程 + stopThread(); - //关闭监听端口 - auto it = _listeners.begin(); - while(it != _listeners.end()) - { - if(it->second->getEndpoint().isTcp()) - { - _epoller->releaseEpollInfo(it->second->getEpollInfo()); - } + //关闭监听端口 + auto it = _listeners.begin(); + while(it != _listeners.end()) + { + if(it->second->getEndpoint().isTcp()) + { + _epoller->releaseEpollInfo(it->second->getEpollInfo()); + } - TC_Port::closeSocket(it->first); + TC_Port::closeSocket(it->first); - ++it; - } - _listeners.clear(); + ++it; + } + _listeners.clear(); + + //回调 + if(_qf) + { + try + { + _qf(this); + } + catch (exception& ex) + { + } + } - //回调 - if(_qf) - { - try - { - _qf(this); - } - catch (exception& ex) - { - } - } + //删除网络线程 + for (size_t i = 0; i < _netThreads.size(); ++i) + { + delete _netThreads[i]; + } - //删除网络线程 - for (size_t i = 0; i < _netThreads.size(); ++i) - { - delete _netThreads[i]; - } + _netThreads.clear(); - _netThreads.clear(); + for (auto & bindAdapter : _bindAdapters) + { + bindAdapter->getHandles().clear(); + } - delete _epoller; + delete _epoller; - _epoller = NULL; + _epoller = NULL; - std::unique_lock lock(_readyMutex); + std::unique_lock lock(_readyMutex); - _readyThreadNum = 0; + _readyThreadNum = 0; - _readyCond.notify_all(); + _readyCond.notify_all(); } void TC_EpollServer::terminate() { - if(_epoller == NULL || _epoller->isTerminate()) - { - return; - } + if(_epoller == NULL || _epoller->isTerminate()) + { + return; + } - //先停止网络线程 - _epoller->terminate(); + //先停止网络线程 + _epoller->terminate(); - //等待waitForShutdown退出! - std::unique_lock lock(_readyMutex); - _readyCond.wait(lock, [&]{ - return _readyThreadNum == 0; - }); + //等待waitForShutdown退出! + std::unique_lock lock(_readyMutex); + _readyCond.wait(lock, [&]{ + return _readyThreadNum == 0; + }); } void TC_EpollServer::setEmptyConnTimeout(int timeout) { - for (size_t i = 0; i < _netThreads.size(); ++i) { - _netThreads[i]->setEmptyConnTimeout(timeout); - } + for (size_t i = 0; i < _netThreads.size(); ++i) { + _netThreads[i]->setEmptyConnTimeout(timeout); + } } int TC_EpollServer::bind(BindAdapterPtr & lsPtr) { - auto it = _listeners.begin(); + auto it = _listeners.begin(); - while (it != _listeners.end()) - { - if (it->second->getName() == lsPtr->getName()) - { - throw TC_Exception("bind name '" + lsPtr->getName() + "' conflicts."); - } - ++it; - } + while (it != _listeners.end()) + { + if (it->second->getName() == lsPtr->getName()) + { + throw TC_Exception("bind name '" + lsPtr->getName() + "' conflicts."); + } + ++it; + } - if(!lsPtr->isManualListen()) - { - lsPtr->bind(); + if(!lsPtr->isManualListen()) + { + lsPtr->bind(); - _listeners[lsPtr->getSocket().getfd()] = lsPtr; - } + _listeners[lsPtr->getSocket().getfd()] = lsPtr; + } - _bindAdapters.push_back(lsPtr); + _bindAdapters.push_back(lsPtr); - return lsPtr->getSocket().getfd(); + return lsPtr->getSocket().getfd(); } void TC_EpollServer::initHandle() { - if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_THREAD || _openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_CO) - { - //网路线程独立, 通过队列和处理线程交互数据 - //不同的连接丢给不同的网路线程处理 - //网络线程接收数据, 丢给对应adapter的队列, 处理线程/协程竞争数据 - for (int i = 0; i < _threadNum; ++i) - { - NetThread *netThread = new NetThread(i, this); - - _netThreads.push_back(netThread); - } - - for (auto & bindAdapter : _bindAdapters) - { - bindAdapter->setNetThreads(_netThreads); - } - - //启动handle线程 - for (auto & bindAdapter : _bindAdapters) - { - const vector & hds = bindAdapter->getHandles(); - - for (uint32_t i = 0; i < hds.size(); ++i) - { - hds[i]->setDataBuffer(bindAdapter->getDataBuffer()); - } - } - - } - else if(_openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO || _openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD) - { - //网络线程和处理线程是同一个线程(一对一) - //不同的连接丢给不同的网路线程处理 - for (auto & bindAdapter : _bindAdapters) - { - vector & hds = bindAdapter->getHandles(); - - vector adapterNetThreads; - - for (uint32_t i = 0; i < hds.size(); ++i) - { - //注意net thread是和adapter相关, 索引每个adapter从头计算 - NetThread *netThread = new NetThread(i, this); - - _netThreads.push_back(netThread); - - adapterNetThreads.push_back(netThread); - - hds[i]->setDataBuffer(bindAdapter->getDataBuffer()); - hds[i]->setNetThread(netThread); - - if(_openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO) - { - netThread->setInitializeHandle(std::bind(&Handle::initialize, hds[i]), std::bind(&Handle::handleOnceCoroutine, hds[i])); - } - else if(_openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD) - { - netThread->setInitializeHandle(std::bind(&Handle::initialize, hds[i]), std::bind(&Handle::handleOnceThread, hds[i])); - } - } - - bindAdapter->setNetThreads(adapterNetThreads); - } - } - else - { - error("please check server pattern, exit!"); - exit(-1); - } + if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_THREAD || _openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_CO) + { + //网路线程独立, 通过队列和处理线程交互数据 + //不同的连接丢给不同的网路线程处理 + //网络线程接收数据, 丢给对应adapter的队列, 处理线程/协程竞争数据 + for (int i = 0; i < _threadNum; ++i) + { + NetThread *netThread = new NetThread(i, this); + + _netThreads.push_back(netThread); + } + + for (auto & bindAdapter : _bindAdapters) + { + bindAdapter->setNetThreads(_netThreads); + } + + //启动handle线程 + for (auto & bindAdapter : _bindAdapters) + { + const vector & hds = bindAdapter->getHandles(); + + for (uint32_t i = 0; i < hds.size(); ++i) + { + hds[i]->setDataBuffer(bindAdapter->getDataBuffer()); + } + } + + } + else if(_openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO || _openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD) + { + //网络线程和处理线程是同一个线程(一对一) + //不同的连接丢给不同的网路线程处理 + for (auto & bindAdapter : _bindAdapters) + { + vector & hds = bindAdapter->getHandles(); + + vector adapterNetThreads; + + for (uint32_t i = 0; i < hds.size(); ++i) + { + //注意net thread是和adapter相关, 索引每个adapter从头计算 + NetThread *netThread = new NetThread(i, this); + + _netThreads.push_back(netThread); + + adapterNetThreads.push_back(netThread); + + hds[i]->setDataBuffer(bindAdapter->getDataBuffer()); + hds[i]->setNetThread(netThread); + + if(_openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO) + { + netThread->setInitializeHandle(std::bind(&Handle::initialize, hds[i]), std::bind(&Handle::handleOnceCoroutine, hds[i])); + } + else if(_openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD) + { + netThread->setInitializeHandle(std::bind(&Handle::initialize, hds[i]), std::bind(&Handle::handleOnceThread, hds[i])); + } + } + + bindAdapter->setNetThreads(adapterNetThreads); + } + } + else + { + error("please check server pattern, exit!"); + exit(-1); + } } void TC_EpollServer::startHandle() { - if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_THREAD || _openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_CO) - { - for (size_t i = 0; i < _netThreads.size(); ++i) - { - _netThreads[i]->start(); - } + if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_THREAD || _openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_CO) + { + for (size_t i = 0; i < _netThreads.size(); ++i) + { + _netThreads[i]->start(); + } // LOG_CONSOLE_DEBUG << "start thread:" << _netThreads.size() << endl; - int handleNum = 0; - - //启动handle线程 - for (auto & bindAdapter : _bindAdapters) - { - handleNum += bindAdapter->getHandleNum(); - - const vector & hds = bindAdapter->getHandles(); - - for (uint32_t i = 0; i < hds.size(); ++i) - { - if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_THREAD) - { - _handlePool.exec(std::bind(&Handle::handleLoopThread, hds[i])); - } - else if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_CO) - { - _handlePool.exec(std::bind(&Handle::handleLoopCoroutine, hds[i])); - } - } - } - - _handlePool.init(handleNum); - - _handlePool.start(); - } - else if(_openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO || _openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD) - { - //网络线程和处理线程是同一个线程(一对一) - //不同的连接丢给不同的网路线程处理 - for (auto & bindAdapter : _bindAdapters) - { - vector & hds = bindAdapter->getHandles(); - - for (uint32_t i = 0; i < hds.size(); ++i) - { - hds[i]->getNetThread()->start(); - } - } - } - else - { - error("please check server pattern, exit!"); - exit(-1); - } + int handleNum = 0; + + //启动handle线程 + for (auto & bindAdapter : _bindAdapters) + { + handleNum += bindAdapter->getHandleNum(); + + const vector & hds = bindAdapter->getHandles(); + + for (uint32_t i = 0; i < hds.size(); ++i) + { + if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_THREAD) + { + _handlePool.exec(std::bind(&Handle::handleLoopThread, hds[i])); + } + else if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_CO) + { + _handlePool.exec(std::bind(&Handle::handleLoopCoroutine, hds[i])); + } + } + } + + _handlePool.init(handleNum); + + _handlePool.start(); + } + else if(_openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_CO || _openCoroutine == TC_EpollServer::NET_THREAD_MERGE_HANDLES_THREAD) + { + //网络线程和处理线程是同一个线程(一对一) + //不同的连接丢给不同的网路线程处理 + for (auto & bindAdapter : _bindAdapters) + { + vector & hds = bindAdapter->getHandles(); + + for (uint32_t i = 0; i < hds.size(); ++i) + { + hds[i]->getNetThread()->start(); + } + } + } + else + { + error("please check server pattern, exit!"); + exit(-1); + } } void TC_EpollServer::notifyThreadReady() { - std::unique_lock lock(_readyMutex); + std::unique_lock lock(_readyMutex); - ++_readyThreadNum; + ++_readyThreadNum; - _readyCond.notify_all(); + _readyCond.notify_all(); } void TC_EpollServer::waitForReady() { - int readyThreadNum = 0; + int readyThreadNum = 0; - if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_THREAD || _openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_CO) - { - readyThreadNum = _threadNum; - } + if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_THREAD || _openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_CO) + { + readyThreadNum = _threadNum; + } - for (auto & bindAdapter : _bindAdapters) - { - const vector & hds = bindAdapter->getHandles(); + for (auto & bindAdapter : _bindAdapters) + { + const vector & hds = bindAdapter->getHandles(); - readyThreadNum += hds.size(); - } + readyThreadNum += hds.size(); + } - { - std::unique_lock lock(_readyMutex); - if(_readyThreadNum == readyThreadNum) - { - return; - } - _readyCond.wait(lock, [&]{ - return _readyThreadNum == readyThreadNum; - }); - } + { + std::unique_lock lock(_readyMutex); + if(_readyThreadNum == readyThreadNum) + { + return; + } + _readyCond.wait(lock, [&]{ + return _readyThreadNum == readyThreadNum; + }); + } } void TC_EpollServer::stopThread() { - if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_THREAD || _openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_CO) - { - //先停止网络线程(如果先停止处理线程/协程以后, 否则来了网络事件, 会导致crash) - for (size_t i = 0; i < _netThreads.size(); ++i) - { - if (_netThreads[i]->joinable()) - { - _netThreads[i]->terminate(); - - _netThreads[i]->getThreadControl().join(); - } - } - - //停止掉handle线程 - for (auto & bindAdapter : _bindAdapters) - { - const vector & hds = bindAdapter->getHandles(); - - for (uint32_t i = 0; i < hds.size(); ++i) - { - hds[i]->terminate(); - } - } - - _handlePool.stop(); - } - else - { - //合并网络线程和处理线程, 直接停止网络线程即可 - for (auto & bindAdapter : _bindAdapters) - { - const vector & hds = bindAdapter->getHandles(); - - for (uint32_t i = 0; i < hds.size(); ++i) - { - hds[i]->terminate(); - - if(hds[i]->getNetThread()->joinable()) - { - hds[i]->getNetThread()->terminate(); - hds[i]->getNetThread()->getThreadControl().join(); - } - } - } - } + if(_openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_THREAD || _openCoroutine == TC_EpollServer::NET_THREAD_QUEUE_HANDLES_CO) + { + //先停止网络线程(如果先停止处理线程/协程以后, 否则来了网络事件, 会导致crash) + for (size_t i = 0; i < _netThreads.size(); ++i) + { + if (_netThreads[i]->joinable()) + { + _netThreads[i]->terminate(); + } + } + + //停止掉handle线程 + for (auto & bindAdapter : _bindAdapters) + { + const vector & hds = bindAdapter->getHandles(); + + for (uint32_t i = 0; i < hds.size(); ++i) + { + hds[i]->terminate(); + } + } + + _handlePool.stop(); + + for (size_t i = 0; i < _netThreads.size(); ++i) + { + if (_netThreads[i]->joinable()) + { + _netThreads[i]->getThreadControl().join(); + } + } + } + else + { + //合并网络线程和处理线程, 直接停止网络线程即可 + for (auto & bindAdapter : _bindAdapters) + { + const vector & hds = bindAdapter->getHandles(); + + for (uint32_t i = 0; i < hds.size(); ++i) + { + hds[i]->terminate(); + + if(hds[i]->getNetThread()->joinable()) + { + hds[i]->getNetThread()->terminate(); + hds[i]->getNetThread()->getThreadControl().join(); + } + } + } + } } void TC_EpollServer::createEpoll() { - uint32_t maxAllConn = 0; + uint32_t maxAllConn = 0; - //监听socket + //监听socket // auto it = _listeners.begin(); - for (auto it : _bindAdapters) - { - if (it->getEndpoint().isTcp()) - { - maxAllConn += it->getMaxConns(); - } - else { - maxAllConn++; - } + for (auto it : _bindAdapters) + { + if (it->getEndpoint().isTcp()) + { + maxAllConn += it->getMaxConns(); + } + else { + maxAllConn++; + } // ++it; - } + } - if (maxAllConn >= (1 << 22)) { - error("createEpoll connection num: " + TC_Common::tostr(maxAllConn) + " >= " + TC_Common::tostr(1 << 22)); - maxAllConn = (1 << 22) - 1; - } + if (maxAllConn >= (1 << 22)) { + error("createEpoll connection num: " + TC_Common::tostr(maxAllConn) + " >= " + TC_Common::tostr(1 << 22)); + maxAllConn = (1 << 22) - 1; + } - for (size_t i = 0; i < _netThreads.size(); ++i) { - _netThreads[i]->createEpoll(maxAllConn); - } + for (size_t i = 0; i < _netThreads.size(); ++i) { + _netThreads[i]->createEpoll(maxAllConn); + } } TC_EpollServer::BindAdapterPtr TC_EpollServer::getBindAdapter(const string & sName) { - auto it = _bindAdapters.begin(); + auto it = _bindAdapters.begin(); - while (it != _bindAdapters.end()) - { - if ((*it)->getName() == sName) - { - return (*it); - } + while (it != _bindAdapters.end()) + { + if ((*it)->getName() == sName) + { + return (*it); + } - ++it; - } - return NULL; + ++it; + } + return NULL; } vector TC_EpollServer::getBindAdapters() { - return _bindAdapters; + return _bindAdapters; } void TC_EpollServer::close(const shared_ptr & data) { - auto adapter = data->adapter(); + auto adapter = data->adapter(); - if(adapter && !adapter->isUdp()) { - //非UDP模式下, close才有效 - adapter->getNetThread(data->threadIndex())->close(data); - } + if(adapter && !adapter->isUdp()) { + //非UDP模式下, close才有效 + adapter->getNetThread(data->threadIndex())->close(data); + } } void TC_EpollServer::send(const shared_ptr & data) { - if(data->buffer()->empty()) - return; + if(data->buffer()->empty()) + return; - auto adapter = data->getRecvContext()->adapter(); + auto adapter = data->getRecvContext()->adapter(); - if(adapter) - { - adapter->getNetThread(data->getRecvContext()->threadIndex())->send(data); - } + if(adapter) + { + adapter->getNetThread(data->getRecvContext()->threadIndex())->send(data); + } } void TC_EpollServer::debug(const string & s) const { - if (_pLocalLogger) { - _pLocalLogger->debug() << "[TARS]" << s << endl; - } + if (_pLocalLogger) { + _pLocalLogger->debug() << "[TARS]" << s << endl; + } } void TC_EpollServer::info(const string & s) const { - if (_pLocalLogger) { - _pLocalLogger->info() << "[TARS]" << s << endl; - } + if (_pLocalLogger) { + _pLocalLogger->info() << "[TARS]" << s << endl; + } } void TC_EpollServer::tars(const string & s) const { - if (_pLocalLogger) { - _pLocalLogger->tars() << "[TARS]" << s << endl; - } + if (_pLocalLogger) { + _pLocalLogger->tars() << "[TARS]" << s << endl; + } } void TC_EpollServer::error(const string & s) const { - if (_pLocalLogger) { - _pLocalLogger->error() << "[TARS]" << s << endl; - } + if (_pLocalLogger) { + _pLocalLogger->error() << "[TARS]" << s << endl; + } } vector TC_EpollServer::getConnStatus(int lfd) { - vector vConnStatus; - for (size_t i = 0; i < _netThreads.size(); ++i) { - vector tmp = _netThreads[i]->getConnStatus(lfd); - for (size_t k = 0; k < tmp.size(); ++k) { - vConnStatus.push_back(tmp[k]); - } - } - return vConnStatus; + vector vConnStatus; + for (size_t i = 0; i < _netThreads.size(); ++i) { + vector tmp = _netThreads[i]->getConnStatus(lfd); + for (size_t k = 0; k < tmp.size(); ++k) { + vConnStatus.push_back(tmp[k]); + } + } + return vConnStatus; } unordered_map TC_EpollServer::getListenSocketInfo() { - return _listeners; + return _listeners; } size_t TC_EpollServer::getConnectionCount() { - size_t iConnTotal = 0; - for (size_t i = 0; i < _netThreads.size(); ++i) { - iConnTotal += _netThreads[i]->getConnectionCount(); - } - return iConnTotal; + size_t iConnTotal = 0; + for (size_t i = 0; i < _netThreads.size(); ++i) { + iConnTotal += _netThreads[i]->getConnectionCount(); + } + return iConnTotal; } size_t TC_EpollServer::getLogicThreadNum() { - size_t iNum = 0; + size_t iNum = 0; - for (auto & bindAdapter : _bindAdapters) { - iNum += bindAdapter->getHandles().size(); - } - return iNum; + for (auto & bindAdapter : _bindAdapters) { + iNum += bindAdapter->getHandles().size(); + } + return iNum; } void TC_EpollServer::setOpenCoroutine(SERVER_OPEN_COROUTINE openCoroutine) { - if(openCoroutine >= NET_THREAD_QUEUE_HANDLES_THREAD && openCoroutine <= NET_THREAD_MERGE_HANDLES_CO) - { - _openCoroutine = openCoroutine; - } + if(openCoroutine >= NET_THREAD_QUEUE_HANDLES_THREAD && openCoroutine <= NET_THREAD_MERGE_HANDLES_CO) + { + _openCoroutine = openCoroutine; + } } void TC_EpollServer::setCoroutineStack(uint32_t iPoolSize, size_t iStackSize) { - _iCoroutinePoolSize = iPoolSize; - _iCoroutineStackSize = iStackSize; + _iCoroutinePoolSize = iPoolSize; + _iCoroutineStackSize = iStackSize; } } diff --git a/util/src/tc_epoller.cpp b/util/src/tc_epoller.cpp index f2e08296..7f0ab4e7 100755 --- a/util/src/tc_epoller.cpp +++ b/util/src/tc_epoller.cpp @@ -309,6 +309,7 @@ void TC_Epoller::close() } #if TARGET_PLATFORM_LINUX || TARGET_PLATFORM_IOS + LOG_CONSOLE_DEBUG << endl; ::close(_iEpollfd); _iEpollfd = -1;