Skip to content

Commit

Permalink
Merge pull request #899 from CesiumGS/all-with-void-futures
Browse files Browse the repository at this point in the history
Support `AsyncSystem::all` on void Futures.
  • Loading branch information
kring authored Sep 30, 2024
2 parents 466e8ad + b61dc5c commit c29f56b
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 21 deletions.
73 changes: 52 additions & 21 deletions CesiumAsync/include/CesiumAsync/AsyncSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <CesiumUtility/Tracing.h>

#include <memory>
#include <type_traits>

namespace CesiumAsync {
class ITaskProcessor;
Expand Down Expand Up @@ -181,10 +182,27 @@ class CESIUMASYNC_API AsyncSystem final {
std::forward<Func>(f))));
}

/**
* @brief The value type of the Future returned by {@link all}.
*
* This will be either `std::vector<T>`, if the input Futures passed to the
* `all` function return values, or `void` if they do not.
*
* @tparam T The value type of the input Futures passed to the function.
*/
template <typename T>
using AllValueType =
std::conditional_t<std::is_void_v<T>, void, std::vector<T>>;

/**
* @brief Creates a Future that resolves when every Future in a vector
* resolves, and rejects when any Future in the vector rejects.
*
* If the input Futures resolve to non-void values, the returned Future
* resolves to a vector of the values, in the same order as the input Futures.
* If the input Futures resolve to void, the returned Future resolves to void
* as well.
*
* If any of the Futures rejects, the returned Future rejects as well. The
* exception included in the rejection will be from the first Future in the
* vector that rejects.
Expand All @@ -199,7 +217,7 @@ class CESIUMASYNC_API AsyncSystem final {
* rejects when any Future in the vector rejects.
*/
template <typename T>
Future<std::vector<T>> all(std::vector<Future<T>>&& futures) const {
Future<AllValueType<T>> all(std::vector<Future<T>>&& futures) const {
return this->all<T, Future<T>>(
std::forward<std::vector<Future<T>>>(futures));
}
Expand All @@ -208,21 +226,26 @@ class CESIUMASYNC_API AsyncSystem final {
* @brief Creates a Future that resolves when every Future in a vector
* resolves, and rejects when any Future in the vector rejects.
*
* If any of the Futures rejects, the returned Future rejects as well. The
* exception included in the rejection will be from the first Future in the
* vector that rejects.
* If the input SharedFutures resolve to non-void values, the returned Future
* resolves to a vector of the values, in the same order as the input
* SharedFutures. If the input SharedFutures resolve to void, the returned
* Future resolves to void as well.
*
* To get detailed rejection information from each of the Futures,
* If any of the SharedFutures rejects, the returned Future rejects as well.
* The exception included in the rejection will be from the first SharedFuture
* in the vector that rejects.
*
* To get detailed rejection information from each of the SharedFutures,
* attach a `catchInMainThread` continuation prior to passing the
* list into `all`.
*
* @tparam T The type that each Future resolves to.
* @param futures The list of futures.
* @return A Future that resolves when all the given Futures resolve, and
* rejects when any Future in the vector rejects.
* @tparam T The type that each SharedFuture resolves to.
* @param futures The list of shared futures.
* @return A Future that resolves when all the given SharedFutures resolve,
* and rejects when any SharedFuture in the vector rejects.
*/
template <typename T>
Future<std::vector<T>> all(std::vector<SharedFuture<T>>&& futures) const {
Future<AllValueType<T>> all(std::vector<SharedFuture<T>>&& futures) const {
return this->all<T, SharedFuture<T>>(
std::forward<std::vector<SharedFuture<T>>>(futures));
}
Expand Down Expand Up @@ -293,7 +316,7 @@ class CESIUMASYNC_API AsyncSystem final {
private:
// Common implementation of 'all' for both Future and SharedFuture.
template <typename T, typename TFutureType>
Future<std::vector<T>> all(std::vector<TFutureType>&& futures) const {
Future<AllValueType<T>> all(std::vector<TFutureType>&& futures) const {
using TTaskType = decltype(TFutureType::_task);
std::vector<TTaskType> tasks;
tasks.reserve(futures.size());
Expand All @@ -304,22 +327,30 @@ class CESIUMASYNC_API AsyncSystem final {

futures.clear();

async::task<std::vector<T>> task =
async::task<AllValueType<T>> task =
async::when_all(tasks.begin(), tasks.end())
.then(
async::inline_scheduler(),
[](std::vector<TTaskType>&& tasks) {
// Get all the results. If any tasks rejected, we'll bail with
// an exception.
std::vector<T> results;
results.reserve(tasks.size());

for (auto it = tasks.begin(); it != tasks.end(); ++it) {
results.emplace_back(it->get());
if constexpr (std::is_void_v<T>) {
// Tasks return void. "Get" each task so that error
// information is propagated.
for (auto it = tasks.begin(); it != tasks.end(); ++it) {
it->get();
}
} else {
// Get all the results. If any tasks rejected, we'll bail
// with an exception.
std::vector<T> results;
results.reserve(tasks.size());

for (auto it = tasks.begin(); it != tasks.end(); ++it) {
results.emplace_back(it->get());
}
return results;
}
return results;
});
return Future<std::vector<T>>(this->_pSchedulers, std::move(task));
return Future<AllValueType<T>>(this->_pSchedulers, std::move(task));
}

std::shared_ptr<CesiumImpl::AsyncSystemSchedulers> _pSchedulers;
Expand Down
56 changes: 56 additions & 0 deletions CesiumAsync/test/TestAsyncSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,30 @@ TEST_CASE("AsyncSystem") {
CHECK(resolved);
}

SECTION("Can use `all` with void-returning Futures") {
auto one = asyncSystem.createPromise<void>();
auto two = asyncSystem.createPromise<void>();
auto three = asyncSystem.createPromise<void>();

std::vector<Future<void>> futures;
futures.emplace_back(one.getFuture());
futures.emplace_back(two.getFuture());
futures.emplace_back(three.getFuture());

Future<void> all = asyncSystem.all(std::move(futures));

bool resolved = false;
Future<void> last =
std::move(all).thenImmediately([&resolved]() { resolved = true; });

three.resolve();
one.resolve();
two.resolve();

last.wait();
CHECK(resolved);
}

SECTION("Future returned by 'all' rejects when any Future rejects") {
auto one = asyncSystem.createPromise<int>();
auto two = asyncSystem.createPromise<int>();
Expand Down Expand Up @@ -493,6 +517,38 @@ TEST_CASE("AsyncSystem") {
CHECK(result[1] == 11);
}

SECTION("can join two shared futures returning void") {
auto promise = asyncSystem.createPromise<void>();
auto sharedFuture = promise.getFuture().share();

bool executed1 = false;
Future<void> one =
sharedFuture.thenInWorkerThread([&executed1]() { CHECK(!executed1); })
.thenInWorkerThread([&executed1]() {
CHECK(!executed1);
executed1 = true;
});

bool executed2 = false;
Future<void> two =
sharedFuture.thenInWorkerThread([&executed2]() { CHECK(!executed2); })
.thenInWorkerThread([&executed2]() {
CHECK(!executed2);
executed2 = true;
});

std::vector<SharedFuture<void>> futures;
futures.emplace_back(std::move(one).share());
futures.emplace_back(std::move(two).share());
Future<void> joined = asyncSystem.all(std::move(futures));

promise.resolve();

joined.wait();
CHECK(executed1);
CHECK(executed2);
}

SECTION("can catch from shared future") {
auto promise = asyncSystem.createPromise<int>();
auto sharedFuture = promise.getFuture().share();
Expand Down

0 comments on commit c29f56b

Please sign in to comment.