From c88cba6dfeb325f53bb05e66c285e786e1c26a1d Mon Sep 17 00:00:00 2001 From: Sean Gilligan Date: Fri, 22 Sep 2023 19:02:47 -0700 Subject: [PATCH] Deprecate PollingChainTipService, merge with PollingChainTipServiceImpl * Deprecate the interface * Merge private and default methods from interface into implementation --- .../rx/jsonrpc/PollingChainTipService.java | 3 +- .../jsonrpc/PollingChainTipServiceImpl.java | 46 ++++++++++++++++--- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipService.java b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipService.java index d86a6a643..c5376724b 100644 --- a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipService.java +++ b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipService.java @@ -11,10 +11,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -//TODO: Merge with PollingChainTipServiceImpl /** * Interface with {@link PollingChainTipService#pollForDistinctChainTip()} method. + * @deprecated Use the {@link PollingChainTipServiceImpl} implementation */ +@Deprecated public interface PollingChainTipService extends ChainTipService, ChainTipClient, RxJsonRpcClient { Logger log = LoggerFactory.getLogger(PollingChainTipService.class); diff --git a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipServiceImpl.java b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipServiceImpl.java index 180be862e..71415aeeb 100644 --- a/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipServiceImpl.java +++ b/cj-btc-rx-jsonrpc/src/main/java/org/consensusj/bitcoin/rx/jsonrpc/PollingChainTipServiceImpl.java @@ -1,14 +1,18 @@ package org.consensusj.bitcoin.rx.jsonrpc; -import org.consensusj.bitcoin.json.pojo.ChainTip; +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.processors.BehaviorProcessor; import io.reactivex.rxjava3.processors.FlowableProcessor; +import org.consensusj.bitcoin.json.pojo.ChainTip; import org.consensusj.bitcoin.jsonrpc.BitcoinClient; +import org.consensusj.bitcoin.jsonrpc.ChainTipClient; import org.consensusj.bitcoin.rx.ChainTipService; import org.consensusj.jsonrpc.JsonRpcStatusException; +import org.consensusj.rx.jsonrpc.RxJsonRpcClient; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,12 +23,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -//TODO: Merge with PollingChainTipService /** - * Implementation of {@link PollingChainTipService} using a {@link BitcoinClient} and a polling interval. - * This can be used as a fallback if ZeroMQ is not available + * Provides {@link ChainTipService} a using a {@link BitcoinClient} and a polling interval. + * This can be used as a fallback if ZeroMQ is not available. */ -public class PollingChainTipServiceImpl implements ChainTipService, PollingChainTipService, Closeable { +public class PollingChainTipServiceImpl implements ChainTipService, ChainTipClient, RxJsonRpcClient, Closeable { private static final Logger log = LoggerFactory.getLogger(PollingChainTipServiceImpl.class); private final BitcoinClient client; private final Observable interval; @@ -61,11 +64,42 @@ public Publisher chainTipPublisher() { return chainTipProcessor; } - @Override + /** + * Provide a polling interval + * + * @return polling interval with desired frequency for polling for new ChainTips. + */ public Observable getPollingInterval() { return interval; } + /** + * Using a polling interval provided by {@link #getPollingInterval()} provide a + * stream of distinct {@link ChainTip}s. + * + * @return A stream of distinct {@code ChainTip}s. + */ + public Flowable pollForDistinctChainTip() { + return getPollingInterval() + .doOnNext(t -> log.debug("got interval")) + .flatMapMaybe(t -> this.currentChainTipMaybe()) + .doOnNext(tip -> log.debug("blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash())) + .distinctUntilChanged(ChainTip::getHash) + .doOnNext(tip -> log.info("** NEW ** blockheight, blockhash = {}, {}", tip.getHeight(), tip.getHash())) + // ERROR backpressure strategy is compatible with BehaviorProcessor since it subscribes to MAX items + .toFlowable(BackpressureStrategy.ERROR); + } + + /** + * Get the active chain tip if there is one (useful for polling clients) + * + * @return The active ChainTip if available (onSuccess) otherwise onComplete (if not available) or onError (if error occurred) + */ + private Maybe currentChainTipMaybe() { + return pollOnceAsync(this::getChainTipsAsync) + .mapOptional(ChainTip::findActiveChainTip); + } + @Override public void close() { chainTipSubscription.dispose();