Skip to content

Commit

Permalink
Fix deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
RobinTF committed Nov 20, 2024
1 parent 3bbc466 commit 3e6561c
Showing 1 changed file with 26 additions and 25 deletions.
51 changes: 26 additions & 25 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,32 +422,33 @@ ProtoResult Join::createResult(
[](auto innerAction, auto innerPermutation) -> Result::Generator {
ad_utility::data_structures::ThreadSafeQueue<Result::IdTableVocabPair>
queue{1};
ad_utility::JThread{[&queue, &innerAction, &innerPermutation]() {
auto addValue = [&queue, &innerPermutation](
IdTable& idTable, LocalVocab& localVocab) {
if (idTable.size() < CHUNK_SIZE) {
return;
}
Result::IdTableVocabPair pair{std::move(idTable),
std::move(localVocab)};
if (!innerPermutation.empty()) {
pair.idTable_.setColumnSubset(innerPermutation);
}
queue.push(std::move(pair));
};
try {
auto finalValue = innerAction(addValue);
if (!finalValue.idTable_.empty()) {
if (!innerPermutation.empty()) {
finalValue.idTable_.setColumnSubset(innerPermutation);
ad_utility::JThread thread{
[&queue, &innerAction, &innerPermutation]() {
auto addValue = [&queue, &innerPermutation](
IdTable& idTable, LocalVocab& localVocab) {
if (idTable.size() < CHUNK_SIZE) {
return;
}
Result::IdTableVocabPair pair{std::move(idTable),
std::move(localVocab)};
if (!innerPermutation.empty()) {
pair.idTable_.setColumnSubset(innerPermutation);
}
queue.push(std::move(pair));
};
try {
auto finalValue = innerAction(addValue);
if (!finalValue.idTable_.empty()) {
if (!innerPermutation.empty()) {
finalValue.idTable_.setColumnSubset(innerPermutation);
}
queue.push(std::move(finalValue));
}
queue.finish();
} catch (...) {
queue.pushException(std::current_exception());
}

Check warning on line 450 in src/engine/Join.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Join.cpp#L449-L450

Added lines #L449 - L450 were not covered by tests
queue.push(std::move(finalValue));
}
queue.finish();
} catch (...) {
queue.pushException(std::current_exception());
}
}};
}};
while (true) {
auto val = queue.pop();
if (!val.has_value()) {
Expand Down

0 comments on commit 3e6561c

Please sign in to comment.