Skip to content

Commit

Permalink
Deprecate PollingChainTipService, merge with PollingChainTipServiceImpl
Browse files Browse the repository at this point in the history
* Deprecate the interface
* Merge private and default methods from interface into implementation
  • Loading branch information
msgilligan committed Sep 23, 2023
1 parent c90bb8c commit c88cba6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

//TODO: Merge with PollingChainTipServiceImpl
/**
* Interface with {@link PollingChainTipService#pollForDistinctChainTip()} method.
* @deprecated Use the {@link PollingChainTipServiceImpl} implementation
*/
@Deprecated
public interface PollingChainTipService extends ChainTipService, ChainTipClient, RxJsonRpcClient {
Logger log = LoggerFactory.getLogger(PollingChainTipService.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package org.consensusj.bitcoin.rx.jsonrpc;

import org.consensusj.bitcoin.json.pojo.ChainTip;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.processors.BehaviorProcessor;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import org.consensusj.bitcoin.json.pojo.ChainTip;
import org.consensusj.bitcoin.jsonrpc.BitcoinClient;
import org.consensusj.bitcoin.jsonrpc.ChainTipClient;
import org.consensusj.bitcoin.rx.ChainTipService;
import org.consensusj.jsonrpc.JsonRpcStatusException;
import org.consensusj.rx.jsonrpc.RxJsonRpcClient;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -19,12 +23,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

//TODO: Merge with PollingChainTipService
/**
* Implementation of {@link PollingChainTipService} using a {@link BitcoinClient} and a polling interval.
* This can be used as a fallback if ZeroMQ is not available
* Provides {@link ChainTipService} a using a {@link BitcoinClient} and a polling interval.
* This can be used as a fallback if ZeroMQ is not available.
*/
public class PollingChainTipServiceImpl implements ChainTipService, PollingChainTipService, Closeable {
public class PollingChainTipServiceImpl implements ChainTipService, ChainTipClient, RxJsonRpcClient, Closeable {
private static final Logger log = LoggerFactory.getLogger(PollingChainTipServiceImpl.class);
private final BitcoinClient client;
private final Observable<Long> interval;
Expand Down Expand Up @@ -61,11 +64,42 @@ public Publisher<ChainTip> chainTipPublisher() {
return chainTipProcessor;
}

@Override
/**
* Provide a polling interval
*
* @return polling interval with desired frequency for polling for new ChainTips.
*/
public Observable<Long> getPollingInterval() {
return interval;
}

/**
* Using a polling interval provided by {@link #getPollingInterval()} provide a
* stream of distinct {@link ChainTip}s.
*
* @return A stream of distinct {@code ChainTip}s.
*/
public Flowable<ChainTip> pollForDistinctChainTip() {
return getPollingInterval()
.doOnNext(t -> log.debug("got interval"))
.flatMapMaybe(t -> this.currentChainTipMaybe())
.doOnNext(tip -> log.debug("blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash()))
.distinctUntilChanged(ChainTip::getHash)
.doOnNext(tip -> log.info("** NEW ** blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash()))
// ERROR backpressure strategy is compatible with BehaviorProcessor since it subscribes to MAX items
.toFlowable(BackpressureStrategy.ERROR);
}

/**
* Get the active chain tip if there is one (useful for polling clients)
*
* @return The active ChainTip if available (onSuccess) otherwise onComplete (if not available) or onError (if error occurred)
*/
private Maybe<ChainTip> currentChainTipMaybe() {
return pollOnceAsync(this::getChainTipsAsync)
.mapOptional(ChainTip::findActiveChainTip);
}

@Override
public void close() {
chainTipSubscription.dispose();
Expand Down

0 comments on commit c88cba6

Please sign in to comment.