Skip to content

Commit

Permalink
Put startWorker + sync behind lock to avoid race
Browse files Browse the repository at this point in the history
Co-Authored-By: Max Kießling <[email protected]>
  • Loading branch information
s1ck and DarthMax committed Nov 15, 2023
1 parent 20d4c0d commit 2d6a451
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions core/src/main/java/org/neo4j/gds/core/concurrency/SyncBarrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class SyncBarrier {

private final AtomicInteger workerCount;
private final AtomicBoolean isSyncing;
private final ReentrantLock lock;
private final BackoffIdleStrategy idleStrategy;
private final Runnable rejectAction;

Expand All @@ -42,24 +44,41 @@ public static SyncBarrier create(Runnable rejectAction) {
private SyncBarrier(Runnable rejectAction) {
this.workerCount = new AtomicInteger(0);
this.isSyncing = new AtomicBoolean(false);
this.lock = new ReentrantLock(true);
this.idleStrategy = new BackoffIdleStrategy();
this.rejectAction = rejectAction;
}

public void startWorker() {
if (isSyncing.get()) {
this.rejectAction.run();
try {
// Checking the sync flag and increment the worker count must be atomic.
// Otherwise, we could run into the situation where thread A passes
// the sync check, is paused and thread B is executing the sync() method.
// If thread A is resumed after sync() is complete, it will violate the
// sync boundary.
this.lock.lock();
if (this.isSyncing.get()) {
this.rejectAction.run();
}
this.workerCount.incrementAndGet();
} finally {
this.lock.unlock();
}
workerCount.incrementAndGet();
}

public void stopWorker() {
workerCount.decrementAndGet();
this.workerCount.decrementAndGet();
}

public void sync() {
this.isSyncing.set(true);
// wait for import processes to finish
try {
this.lock.lock();
this.isSyncing.set(true);
} finally {
this.lock.unlock();
}

// Wait for all workers to finish.
while (workerCount.get() > 0) {
idleStrategy.idle();
}
Expand Down

0 comments on commit 2d6a451

Please sign in to comment.