Skip to content

Commit

Permalink
Fix race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTF committed Nov 18, 2024
1 parent 711c3a7 commit 08be2d8
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,14 @@ void Join::join(const IdTable& a, ColumnIndex jc1, const IdTable& b,
// _____________________________________________________________________________
ProtoResult Join::createResult(bool requestedLaziness, auto action) const {
if (requestedLaziness) {
auto startProcessing = std::make_shared<std::atomic_flag>(false);
auto queue = std::make_shared<
ad_utility::data_structures::ThreadSafeQueue<Result::IdTableVocabPair>>(
1);
ad_utility::JThread{[queue, action = std::move(action)]() {
ad_utility::JThread{[startProcessing, queue, action = std::move(action)]() {
// Don't start processing until the main thread has reached the generator
// to avoid race conditions.
startProcessing->wait(false);
auto addValue = [&queue](Result::IdTableVocabPair value) {
if (value.idTable_.empty()) {
return;
Expand All @@ -425,15 +429,17 @@ ProtoResult Join::createResult(bool requestedLaziness, auto action) const {
queue->pushException(std::current_exception());
}
}}.detach();
return {[](auto queue) -> Result::Generator {
return {[](auto queue, auto startProcessing) -> Result::Generator {
startProcessing->test_and_set();
startProcessing->notify_one();
while (true) {
auto val = queue->pop();
if (!val.has_value()) {
break;
}
co_yield val.value();
}
}(std::move(queue)),
}(std::move(queue), std::move(startProcessing)),
resultSortedOn()};
} else {
auto [idTable, localVocab] = action(ad_utility::noop);
Expand Down

0 comments on commit 08be2d8

Please sign in to comment.