Skip to content

Commit

Permalink
Chain request dispatch after one completes. Add between frame buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
csciguy8 committed Dec 13, 2023
1 parent 8fef19e commit c6b5dca
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,13 @@ class RequestDispatcher {

private:
void dispatchRequest(TileLoadWork& request);
void
stageRequestWork(size_t dispatchCount, std::vector<TileLoadWork>& stagedWork);
void stageQueuedWork(std::vector<TileLoadWork>& workNeedingDispatch);

void onRequestFinished(
uint16_t responseStatusCode,
const gsl::span<const std::byte>* pResponseData,
const TileLoadWork& request);
const TileLoadWork& request,
std::vector<TileLoadWork>& workNeedingDispatch);

// Thread safe members
std::mutex _requestsLock;
Expand Down
120 changes: 73 additions & 47 deletions Cesium3DTilesSelection/src/Tileset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1720,12 +1720,16 @@ void Tileset::addWorkToRequestDispatcher(
// We're always going to process no-url work. Mark it as loading
markWorkTilesAsLoading(noUrlWork);

// Figure out how much url work we will accept
// Figure out how much url work we will accept.
// We want some work to be ready to go in between frames
// so the dispatcher doesn't starve while we wait for a tick
size_t betweenFrameBuffer = 10;
size_t maxCountToQueue = maxSimultaneousRequests + betweenFrameBuffer;
size_t pendingRequestCount =
this->_requestDispatcher.GetPendingRequestsCount();
assert(pendingRequestCount <= maxSimultaneousRequests);
assert(pendingRequestCount <= maxCountToQueue);

size_t slotsOpen = maxSimultaneousRequests - pendingRequestCount;
size_t slotsOpen = maxCountToQueue - pendingRequestCount;
if (slotsOpen == 0) {
// No request slots open, we can at least insert our no url work
_requestDispatcher.PassThroughWork(noUrlWork);
Expand Down Expand Up @@ -1865,7 +1869,8 @@ void RequestDispatcher::PassThroughWork(const std::vector<TileLoadWork>& work) {
void RequestDispatcher::onRequestFinished(
uint16_t responseStatusCode,
const gsl::span<const std::byte>* pResponseData,
const TileLoadWork& request) {
const TileLoadWork& request,
std::vector<TileLoadWork>& workNeedingDispatch) {
std::lock_guard<std::mutex> lock(_requestsLock);

if (_exitSignaled)
Expand Down Expand Up @@ -1901,6 +1906,21 @@ void RequestDispatcher::onRequestFinished(

// Remove it
_inFlightWork.erase(foundIt);

//
// We know we have one less in flight request
// Immediately dispatch another if available
//
size_t queueCount = _queuedWork.size();
if (queueCount == 0)
return;

// Sort the incoming queue by priority (if more than 1)
if (queueCount > 1)
std::sort(_queuedWork.rbegin(), _queuedWork.rend());

// Stage exactly one work item
stageQueuedWork(workNeedingDispatch);
}

void RequestDispatcher::dispatchRequest(TileLoadWork& request) {
Expand All @@ -1916,56 +1936,49 @@ void RequestDispatcher::dispatchRequest(TileLoadWork& request) {
// Add payload to this work
const IAssetResponse* pResponse = pCompletedRequest->response();

std::vector<TileLoadWork> workNeedingDispatch;
if (pResponse) {
gsl::span<const std::byte> data = pResponse->data();
_this->onRequestFinished(pResponse->statusCode(), &data, _request);
_this->onRequestFinished(
pResponse->statusCode(),
&data,
_request,
workNeedingDispatch);
} else {
// TODO, how will the consumer of the done request know the error?
_this->onRequestFinished(NULL, 0, _request);
_this->onRequestFinished(NULL, 0, _request, workNeedingDispatch);
}

if (!workNeedingDispatch.empty()) {
assert(workNeedingDispatch.size() == 1);
_this->dispatchRequest(*workNeedingDispatch.begin());
}

return pResponse != NULL;
});
}

void RequestDispatcher::stageRequestWork(
size_t availableSlotCount,
std::vector<TileLoadWork>& stagedWork) {
std::lock_guard<std::mutex> lock(_requestsLock);

size_t queueCount = _queuedWork.size();
if (queueCount == 0)
return;

// Sort our incoming request queue by priority
// Sort descending so highest priority is at back of vector
std::sort(_queuedWork.rbegin(), _queuedWork.rend());
void RequestDispatcher::stageQueuedWork(
std::vector<TileLoadWork>& workNeedingDispatch) {
// Take from back of queue (highest priority).
assert(_queuedWork.size() > 0);
TileLoadWork request = _queuedWork.back();
_queuedWork.pop_back();

// Stage amount of work specified by caller, or whatever is left
size_t dispatchCount = std::min(queueCount, availableSlotCount);

for (size_t index = 0; index < dispatchCount; ++index) {

// Take from back of queue (highest priority).
assert(_queuedWork.size() > 0);
TileLoadWork request = _queuedWork.back();
_queuedWork.pop_back();

// Move to in flight registry
std::map<std::string, std::vector<TileLoadWork>>::iterator foundIt;
foundIt = _inFlightWork.find(request.requestUrl);
if (foundIt == _inFlightWork.end()) {
// Request doesn't exist, set up a new one
std::vector<TileLoadWork> newWorkVec;
newWorkVec.push_back(request);
_inFlightWork[request.requestUrl] = newWorkVec;

// Copy to our output vector
stagedWork.push_back(request);
} else {
// Tag on to an existing request. Don't bother staging it. Already is.
foundIt->second.push_back(request);
}
// Move to in flight registry
std::map<std::string, std::vector<TileLoadWork>>::iterator foundIt;
foundIt = _inFlightWork.find(request.requestUrl);
if (foundIt == _inFlightWork.end()) {
// Request doesn't exist, set up a new one
std::vector<TileLoadWork> newWorkVec;
newWorkVec.push_back(request);
_inFlightWork[request.requestUrl] = newWorkVec;

// Copy to our output vector
workNeedingDispatch.push_back(request);
} else {
// Tag on to an existing request. Don't bother staging it. Already is.
foundIt->second.push_back(request);
}
}

Expand Down Expand Up @@ -2035,14 +2048,27 @@ void RequestDispatcher::WakeIfNeeded(size_t maxSimultaneousRequests) {
}
assert(slotsAvailable >= 0);

std::vector<TileLoadWork> workNeedingDispatch;
if (slotsAvailable > 0) {
std::vector<TileLoadWork> stagedWork;
stageRequestWork(slotsAvailable, stagedWork);
std::lock_guard<std::mutex> lock(_requestsLock);

size_t queueCount = _queuedWork.size();
if (queueCount > 0) {
// Sort our incoming request queue by priority
// Sort descending so highest priority is at back of vector
std::sort(_queuedWork.rbegin(), _queuedWork.rend());

for (TileLoadWork& requestWork : stagedWork)
dispatchRequest(requestWork);
// Stage amount of work specified by caller, or whatever is left
size_t dispatchCount = std::min(queueCount, slotsAvailable);

for (size_t index = 0; index < dispatchCount; ++index)
stageQueuedWork(workNeedingDispatch);
}
}

for (TileLoadWork& requestWork : workNeedingDispatch)
dispatchRequest(requestWork);

// We dispatched as much as possible
// Continue loop until our queue is empty or exit is signaled
{
Expand Down

0 comments on commit c6b5dca

Please sign in to comment.