diff --git a/Cesium3DTilesSelection/include/Cesium3DTilesSelection/Tileset.h b/Cesium3DTilesSelection/include/Cesium3DTilesSelection/Tileset.h index 1c7897b6b..a346d635e 100644 --- a/Cesium3DTilesSelection/include/Cesium3DTilesSelection/Tileset.h +++ b/Cesium3DTilesSelection/include/Cesium3DTilesSelection/Tileset.h @@ -82,12 +82,11 @@ class RequestDispatcher { void QueueRequestWork( const std::vector& work, const std::vector& passThroughWork, - const std::vector& requestHeaders); + const std::vector& requestHeaders, + size_t maxSimultaneousRequests); void PassThroughWork(const std::vector& work); - void WakeIfNeeded(size_t maxSimultaneousRequests); - void TakeCompletedWork(size_t maxCount, std::vector& out); size_t GetPendingRequestsCount(); @@ -96,18 +95,17 @@ class RequestDispatcher { void GetRequestsStats(size_t& queued, size_t& inFlight, size_t& done); private: + void transitionQueuedWork(); void dispatchRequest(TileLoadWork& request); void stageQueuedWork(std::vector& workNeedingDispatch); void onRequestFinished( uint16_t responseStatusCode, const gsl::span* pResponseData, - const TileLoadWork& request, - std::vector& workNeedingDispatch); + const TileLoadWork& request); // Thread safe members std::mutex _requestsLock; - bool _dispatcherIdle = true; bool _exitSignaled = false; std::vector _queuedWork; std::map> _inFlightWork; @@ -120,6 +118,8 @@ class RequestDispatcher { std::shared_ptr _pLogger; std::vector _requestHeaders; + + size_t _maxSimultaneousRequests; }; /** diff --git a/Cesium3DTilesSelection/src/Tileset.cpp b/Cesium3DTilesSelection/src/Tileset.cpp index b65559d7f..f8c6d5a12 100644 --- a/Cesium3DTilesSelection/src/Tileset.cpp +++ b/Cesium3DTilesSelection/src/Tileset.cpp @@ -1758,9 +1758,8 @@ void Tileset::addWorkToRequestDispatcher( _requestDispatcher.QueueRequestWork( workToSubmit, noUrlWork, - this->_pTilesetContentManager->getRequestHeaders()); - - _requestDispatcher.WakeIfNeeded(maxSimultaneousRequests); + this->_pTilesetContentManager->getRequestHeaders(), + maxSimultaneousRequests); } } @@ -1844,19 +1843,27 @@ RequestDispatcher::~RequestDispatcher() noexcept { void RequestDispatcher::QueueRequestWork( const std::vector& work, const std::vector& passThroughWork, - const std::vector& requestHeaders) { + const std::vector& requestHeaders, + size_t maxSimultaneousRequests) { if (work.empty() && passThroughWork.empty()) return; - std::lock_guard lock(_requestsLock); - _queuedWork.insert(_queuedWork.end(), work.begin(), work.end()); + { + std::lock_guard lock(_requestsLock); + _queuedWork.insert(_queuedWork.end(), work.begin(), work.end()); + + _doneWork.insert( + _doneWork.end(), + passThroughWork.begin(), + passThroughWork.end()); + + _requestHeaders = requestHeaders; - _doneWork.insert( - _doneWork.end(), - passThroughWork.begin(), - passThroughWork.end()); + assert(maxSimultaneousRequests > 0); + _maxSimultaneousRequests = maxSimultaneousRequests; + } - _requestHeaders = requestHeaders; + transitionQueuedWork (); } void RequestDispatcher::PassThroughWork(const std::vector& work) { @@ -1869,8 +1876,7 @@ void RequestDispatcher::PassThroughWork(const std::vector& work) { void RequestDispatcher::onRequestFinished( uint16_t responseStatusCode, const gsl::span* pResponseData, - const TileLoadWork& request, - std::vector& workNeedingDispatch) { + const TileLoadWork& request) { std::lock_guard lock(_requestsLock); if (_exitSignaled) @@ -1906,25 +1912,9 @@ void RequestDispatcher::onRequestFinished( // Remove it _inFlightWork.erase(foundIt); - - // - // We know we have one less in flight request - // Immediately dispatch another if available - // - size_t queueCount = _queuedWork.size(); - if (queueCount == 0) - return; - - // Sort the incoming queue by priority (if more than 1) - if (queueCount > 1) - std::sort(_queuedWork.rbegin(), _queuedWork.rend()); - - // Stage exactly one work item - stageQueuedWork(workNeedingDispatch); } void RequestDispatcher::dispatchRequest(TileLoadWork& request) { - //SPDLOG_LOGGER_INFO(_pLogger, "Send network request: {}", request.requestUrl); // TODO. This uses the externals asset accessor (unreal, gunzip, etc) @@ -1936,23 +1926,15 @@ void RequestDispatcher::dispatchRequest(TileLoadWork& request) { // Add payload to this work const IAssetResponse* pResponse = pCompletedRequest->response(); - std::vector workNeedingDispatch; if (pResponse) { gsl::span data = pResponse->data(); - _this->onRequestFinished( - pResponse->statusCode(), - &data, - _request, - workNeedingDispatch); + _this->onRequestFinished(pResponse->statusCode(), &data, _request); } else { // TODO, how will the consumer of the done request know the error? - _this->onRequestFinished(NULL, 0, _request, workNeedingDispatch); + _this->onRequestFinished(NULL, 0, _request); } - if (!workNeedingDispatch.empty()) { - assert(workNeedingDispatch.size() == 1); - _this->dispatchRequest(*workNeedingDispatch.begin()); - } + _this->transitionQueuedWork(); return pResponse != NULL; }); @@ -2026,63 +2008,37 @@ void RequestDispatcher::TakeCompletedWork( _doneWork.erase(_doneWork.begin(), _doneWork.begin() + numberToTake); } -void RequestDispatcher::WakeIfNeeded(size_t maxSimultaneousRequests) { +void RequestDispatcher::transitionQueuedWork() { + std::vector workNeedingDispatch; { std::lock_guard lock(_requestsLock); - if (!_dispatcherIdle) - return; - _dispatcherIdle = false; - } - _asyncSystem.runInWorkerThread([this, maxSimultaneousRequests]() { - const int throttleTimeInMs = 50; - auto sleepForValue = std::chrono::milliseconds(throttleTimeInMs); + size_t queueCount = _queuedWork.size(); + if (queueCount > 0) { + // We have work to do - while (1) { - // If slots available, we can dispatch some work - size_t slotsAvailable; - { - std::lock_guard lock(_requestsLock); - assert(_inFlightWork.size() <= maxSimultaneousRequests); - slotsAvailable = maxSimultaneousRequests - _inFlightWork.size(); - } - assert(slotsAvailable >= 0); + size_t slotsTotal = _maxSimultaneousRequests; + size_t slotsUsed = _inFlightWork.size(); + if (slotsUsed < slotsTotal) { + // There are free slots + size_t slotsAvailable = slotsTotal - slotsUsed; - std::vector workNeedingDispatch; - if (slotsAvailable > 0) { - std::lock_guard lock(_requestsLock); - - size_t queueCount = _queuedWork.size(); - if (queueCount > 0) { - // Sort our incoming request queue by priority - // Sort descending so highest priority is at back of vector + // Sort our incoming request queue by priority + // Sort descending so highest priority is at back of vector + if (queueCount > 1) std::sort(_queuedWork.rbegin(), _queuedWork.rend()); - // Stage amount of work specified by caller, or whatever is left - size_t dispatchCount = std::min(queueCount, slotsAvailable); + // Stage amount of work specified by caller, or whatever is left + size_t dispatchCount = std::min(queueCount, slotsAvailable); - for (size_t index = 0; index < dispatchCount; ++index) - stageQueuedWork(workNeedingDispatch); - } + for (size_t index = 0; index < dispatchCount; ++index) + stageQueuedWork(workNeedingDispatch); } - - for (TileLoadWork& requestWork : workNeedingDispatch) - dispatchRequest(requestWork); - - // We dispatched as much as possible - // Continue loop until our queue is empty or exit is signaled - { - std::lock_guard lock(_requestsLock); - if (_queuedWork.empty() || _exitSignaled) { - this->_dispatcherIdle = true; - break; - } - } - - // Wait a bit to be friendly to other threads - std::this_thread::sleep_for(sleepForValue); } - }); + } + + for (TileLoadWork& requestWork : workNeedingDispatch) + dispatchRequest(requestWork); } } // namespace Cesium3DTilesSelection