Skip to content

Commit

Permalink
Remove polling from request dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
csciguy8 committed Dec 13, 2023
1 parent c4de038 commit 78ac6fe
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 93 deletions.
12 changes: 6 additions & 6 deletions Cesium3DTilesSelection/include/Cesium3DTilesSelection/Tileset.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,11 @@ class RequestDispatcher {
void QueueRequestWork(
const std::vector<TileLoadWork>& work,
const std::vector<TileLoadWork>& passThroughWork,
const std::vector<CesiumAsync::IAssetAccessor::THeader>& requestHeaders);
const std::vector<CesiumAsync::IAssetAccessor::THeader>& requestHeaders,
size_t maxSimultaneousRequests);

void PassThroughWork(const std::vector<TileLoadWork>& work);

void WakeIfNeeded(size_t maxSimultaneousRequests);

void TakeCompletedWork(size_t maxCount, std::vector<TileLoadWork>& out);

size_t GetPendingRequestsCount();
Expand All @@ -96,18 +95,17 @@ class RequestDispatcher {
void GetRequestsStats(size_t& queued, size_t& inFlight, size_t& done);

private:
void transitionQueuedWork();
void dispatchRequest(TileLoadWork& request);
void stageQueuedWork(std::vector<TileLoadWork>& workNeedingDispatch);

void onRequestFinished(
uint16_t responseStatusCode,
const gsl::span<const std::byte>* pResponseData,
const TileLoadWork& request,
std::vector<TileLoadWork>& workNeedingDispatch);
const TileLoadWork& request);

// Thread safe members
std::mutex _requestsLock;
bool _dispatcherIdle = true;
bool _exitSignaled = false;
std::vector<TileLoadWork> _queuedWork;
std::map<std::string, std::vector<TileLoadWork>> _inFlightWork;
Expand All @@ -120,6 +118,8 @@ class RequestDispatcher {
std::shared_ptr<spdlog::logger> _pLogger;

std::vector<CesiumAsync::IAssetAccessor::THeader> _requestHeaders;

size_t _maxSimultaneousRequests;
};

/**
Expand Down
130 changes: 43 additions & 87 deletions Cesium3DTilesSelection/src/Tileset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1758,9 +1758,8 @@ void Tileset::addWorkToRequestDispatcher(
_requestDispatcher.QueueRequestWork(
workToSubmit,
noUrlWork,
this->_pTilesetContentManager->getRequestHeaders());

_requestDispatcher.WakeIfNeeded(maxSimultaneousRequests);
this->_pTilesetContentManager->getRequestHeaders(),
maxSimultaneousRequests);
}
}

Expand Down Expand Up @@ -1844,19 +1843,27 @@ RequestDispatcher::~RequestDispatcher() noexcept {
void RequestDispatcher::QueueRequestWork(
const std::vector<TileLoadWork>& work,
const std::vector<TileLoadWork>& passThroughWork,
const std::vector<CesiumAsync::IAssetAccessor::THeader>& requestHeaders) {
const std::vector<CesiumAsync::IAssetAccessor::THeader>& requestHeaders,
size_t maxSimultaneousRequests) {
if (work.empty() && passThroughWork.empty())
return;

std::lock_guard<std::mutex> lock(_requestsLock);
_queuedWork.insert(_queuedWork.end(), work.begin(), work.end());
{
std::lock_guard<std::mutex> lock(_requestsLock);
_queuedWork.insert(_queuedWork.end(), work.begin(), work.end());

_doneWork.insert(
_doneWork.end(),
passThroughWork.begin(),
passThroughWork.end());

_requestHeaders = requestHeaders;

_doneWork.insert(
_doneWork.end(),
passThroughWork.begin(),
passThroughWork.end());
assert(maxSimultaneousRequests > 0);
_maxSimultaneousRequests = maxSimultaneousRequests;
}

_requestHeaders = requestHeaders;
transitionQueuedWork ();
}

void RequestDispatcher::PassThroughWork(const std::vector<TileLoadWork>& work) {
Expand All @@ -1869,8 +1876,7 @@ void RequestDispatcher::PassThroughWork(const std::vector<TileLoadWork>& work) {
void RequestDispatcher::onRequestFinished(
uint16_t responseStatusCode,
const gsl::span<const std::byte>* pResponseData,
const TileLoadWork& request,
std::vector<TileLoadWork>& workNeedingDispatch) {
const TileLoadWork& request) {
std::lock_guard<std::mutex> lock(_requestsLock);

if (_exitSignaled)
Expand Down Expand Up @@ -1906,25 +1912,9 @@ void RequestDispatcher::onRequestFinished(

// Remove it
_inFlightWork.erase(foundIt);

//
// We know we have one less in flight request
// Immediately dispatch another if available
//
size_t queueCount = _queuedWork.size();
if (queueCount == 0)
return;

// Sort the incoming queue by priority (if more than 1)
if (queueCount > 1)
std::sort(_queuedWork.rbegin(), _queuedWork.rend());

// Stage exactly one work item
stageQueuedWork(workNeedingDispatch);
}

void RequestDispatcher::dispatchRequest(TileLoadWork& request) {

//SPDLOG_LOGGER_INFO(_pLogger, "Send network request: {}", request.requestUrl);

// TODO. This uses the externals asset accessor (unreal, gunzip, etc)
Expand All @@ -1936,23 +1926,15 @@ void RequestDispatcher::dispatchRequest(TileLoadWork& request) {
// Add payload to this work
const IAssetResponse* pResponse = pCompletedRequest->response();

std::vector<TileLoadWork> workNeedingDispatch;
if (pResponse) {
gsl::span<const std::byte> data = pResponse->data();
_this->onRequestFinished(
pResponse->statusCode(),
&data,
_request,
workNeedingDispatch);
_this->onRequestFinished(pResponse->statusCode(), &data, _request);
} else {
// TODO, how will the consumer of the done request know the error?
_this->onRequestFinished(NULL, 0, _request, workNeedingDispatch);
_this->onRequestFinished(NULL, 0, _request);
}

if (!workNeedingDispatch.empty()) {
assert(workNeedingDispatch.size() == 1);
_this->dispatchRequest(*workNeedingDispatch.begin());
}
_this->transitionQueuedWork();

return pResponse != NULL;
});
Expand Down Expand Up @@ -2026,63 +2008,37 @@ void RequestDispatcher::TakeCompletedWork(
_doneWork.erase(_doneWork.begin(), _doneWork.begin() + numberToTake);
}

void RequestDispatcher::WakeIfNeeded(size_t maxSimultaneousRequests) {
void RequestDispatcher::transitionQueuedWork() {
std::vector<TileLoadWork> workNeedingDispatch;
{
std::lock_guard<std::mutex> lock(_requestsLock);
if (!_dispatcherIdle)
return;
_dispatcherIdle = false;
}

_asyncSystem.runInWorkerThread([this, maxSimultaneousRequests]() {
const int throttleTimeInMs = 50;
auto sleepForValue = std::chrono::milliseconds(throttleTimeInMs);
size_t queueCount = _queuedWork.size();
if (queueCount > 0) {
// We have work to do

while (1) {
// If slots available, we can dispatch some work
size_t slotsAvailable;
{
std::lock_guard<std::mutex> lock(_requestsLock);
assert(_inFlightWork.size() <= maxSimultaneousRequests);
slotsAvailable = maxSimultaneousRequests - _inFlightWork.size();
}
assert(slotsAvailable >= 0);
size_t slotsTotal = _maxSimultaneousRequests;
size_t slotsUsed = _inFlightWork.size();
if (slotsUsed < slotsTotal) {
// There are free slots
size_t slotsAvailable = slotsTotal - slotsUsed;

std::vector<TileLoadWork> workNeedingDispatch;
if (slotsAvailable > 0) {
std::lock_guard<std::mutex> lock(_requestsLock);

size_t queueCount = _queuedWork.size();
if (queueCount > 0) {
// Sort our incoming request queue by priority
// Sort descending so highest priority is at back of vector
// Sort our incoming request queue by priority
// Sort descending so highest priority is at back of vector
if (queueCount > 1)
std::sort(_queuedWork.rbegin(), _queuedWork.rend());

// Stage amount of work specified by caller, or whatever is left
size_t dispatchCount = std::min(queueCount, slotsAvailable);
// Stage amount of work specified by caller, or whatever is left
size_t dispatchCount = std::min(queueCount, slotsAvailable);

for (size_t index = 0; index < dispatchCount; ++index)
stageQueuedWork(workNeedingDispatch);
}
for (size_t index = 0; index < dispatchCount; ++index)
stageQueuedWork(workNeedingDispatch);
}

for (TileLoadWork& requestWork : workNeedingDispatch)
dispatchRequest(requestWork);

// We dispatched as much as possible
// Continue loop until our queue is empty or exit is signaled
{
std::lock_guard<std::mutex> lock(_requestsLock);
if (_queuedWork.empty() || _exitSignaled) {
this->_dispatcherIdle = true;
break;
}
}

// Wait a bit to be friendly to other threads
std::this_thread::sleep_for(sleepForValue);
}
});
}

for (TileLoadWork& requestWork : workNeedingDispatch)
dispatchRequest(requestWork);
}

} // namespace Cesium3DTilesSelection

0 comments on commit 78ac6fe

Please sign in to comment.