Skip to content

Commit

Permalink
Use robuster approach for thread safety
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTF committed Nov 20, 2024
1 parent 3e6561c commit ec02b9f
Showing 1 changed file with 39 additions and 21 deletions.
60 changes: 39 additions & 21 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "util/Exception.h"
#include "util/HashMap.h"
#include "util/JoinAlgorithms/JoinAlgorithms.h"
#include "util/ThreadSafeQueue.h"

using std::endl;
using std::string;
Expand Down Expand Up @@ -418,14 +417,21 @@ ProtoResult Join::createResult(
std::function<void(IdTable&, LocalVocab&)>> auto action,
std::vector<ColumnIndex> permutation) const {
if (requestedLaziness) {
return {
[](auto innerAction, auto innerPermutation) -> Result::Generator {
ad_utility::data_structures::ThreadSafeQueue<Result::IdTableVocabPair>
queue{1};
ad_utility::JThread thread{
[&queue, &innerAction, &innerPermutation]() {
auto addValue = [&queue, &innerPermutation](
return {[](auto innerAction, auto innerPermutation) -> Result::Generator {
std::atomic_flag write = true;
std::variant<std::monostate, Result::IdTableVocabPair,
std::exception_ptr>
storage;
ad_utility::JThread thread{[&write, &storage, &innerAction,
&innerPermutation]() {
auto writeValue = [&write, &storage](auto value) noexcept {
storage = std::move(value);
write.clear();
write.notify_one();
};
auto addValue = [&write, &writeValue, &innerPermutation](
IdTable& idTable, LocalVocab& localVocab) {
AD_CORRECTNESS_CHECK(write.test());
if (idTable.size() < CHUNK_SIZE) {
return;
}
Expand All @@ -434,30 +440,42 @@ ProtoResult Join::createResult(
if (!innerPermutation.empty()) {
pair.idTable_.setColumnSubset(innerPermutation);
}
queue.push(std::move(pair));
writeValue(std::move(pair));
// Wait until we are allowed to write again.
write.wait(false);
};
try {
auto finalValue = innerAction(addValue);
AD_CORRECTNESS_CHECK(write.test());
if (!finalValue.idTable_.empty()) {
if (!innerPermutation.empty()) {
finalValue.idTable_.setColumnSubset(innerPermutation);
}
queue.push(std::move(finalValue));
writeValue(std::move(finalValue));
// Wait until we are allowed to write again.
write.wait(false);
}
queue.finish();
writeValue(std::monostate{});
} catch (...) {
queue.pushException(std::current_exception());
writeValue(std::current_exception());
}
}};
while (true) {
auto val = queue.pop();
if (!val.has_value()) {
break;
}
co_yield val.value();
}
}(std::move(action), std::move(permutation)),
resultSortedOn()};
while (true) {
// Wait for read phase.
write.wait(true);
if (std::holds_alternative<std::monostate>(storage)) {
break;
}
if (std::holds_alternative<std::exception_ptr>(storage)) {
std::rethrow_exception(std::get<std::exception_ptr>(storage));
}
co_yield std::get<Result::IdTableVocabPair>(storage);
// Initiate write phase.
write.test_and_set();
write.notify_one();
}
}(std::move(action), std::move(permutation)),
resultSortedOn()};
} else {
auto [idTable, localVocab] = action(ad_utility::noop);
if (!permutation.empty()) {
Expand Down

0 comments on commit ec02b9f

Please sign in to comment.