Skip to content

Commit

Permalink
Fix request stream recreation on reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
borisov committed May 8, 2020
1 parent d389406 commit ed81b5a
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@
public interface RequestObserver {

StreamObserver<FetchAndLockRequest> getStreamObserver();

void reset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ public StreamObserver<FetchAndLockRequest> getStreamObserver() {
private StreamObserver<FetchAndLockRequest> buildStreamObserver() {
return stub.getStub().fetchAndLock(responseObserver);
}

@Override
public void reset() {
streamObserver = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.camunda.bpm.engine.grpc.client.subscription.SubscriptionRepository;
import org.camunda.bpm.engine.grpc.client.worker.Handler;
import org.camunda.bpm.engine.grpc.client.worker.Locker;
import org.camunda.bpm.engine.grpc.client.worker.Watchdog;
import org.springframework.stereotype.Component;

@Slf4j
Expand All @@ -16,6 +17,8 @@ public class HandlerImpl implements Handler {

private final Locker locker;

private final Watchdog watchdog;

private final SubscriptionRepository subscriptionRepository;

private final RequestObserver requestObserver;
Expand All @@ -24,25 +27,47 @@ public class HandlerImpl implements Handler {

@Override
public void handle() {
if (!subscriptionRepository.isEmpty()) {
requestObserver.getStreamObserver().onNext(
requestFactory.create()
);

locker.lock();
} else {
log.info("There is no registered external task subscriptions");

try {
Thread.sleep(500);
} catch (InterruptedException e) {
log.error("Thread interrupted", e);
}
if (!handleConnection() || !handleSubscriptions()) {
sleep();
}
}

@Override
public void complete() {
requestObserver.getStreamObserver().onCompleted();
}

private boolean handleConnection() {
if (!watchdog.watch()) {
requestObserver.reset();

return false;
}

return true;
}

private boolean handleSubscriptions() {
if (subscriptionRepository.isEmpty()) {
log.info("There is no registered external task subscriptions");

return false;
}

requestObserver.getStreamObserver().onNext(
requestFactory.create()
);

locker.lock();

return true;
}

private void sleep() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("Thread sleep interrupted", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.camunda.bpm.engine.grpc.client.worker.Handler;
import org.camunda.bpm.engine.grpc.client.worker.Watchdog;
import org.camunda.bpm.engine.grpc.client.worker.Worker;
import org.springframework.stereotype.Component;

Expand All @@ -16,8 +15,6 @@ public class WorkerImpl implements Worker {

private final Handler handler;

private final Watchdog watchdog;

private AtomicBoolean isRunning = new AtomicBoolean(false);

private Thread thread = new Thread(this, getClass().getSimpleName());
Expand Down Expand Up @@ -54,11 +51,7 @@ public void stop() throws AlreadyStoppedException {
public void run() {
while (isRunning.get()) {
try {
if (watchdog.watch()) {
handler.handle();
} else {
Thread.sleep(1000);
}
handler.handle();
} catch (Throwable e) {
log.error("Exception while handling worker thread", e);
}
Expand Down

0 comments on commit ed81b5a

Please sign in to comment.