diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt
index 6ea49bf98..15a518bdb 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt
@@ -28,7 +28,7 @@ import io.emeraldpay.dshackle.reader.StandardRpcReader
import io.emeraldpay.dshackle.upstream.CurrentMultistreamHolder
import io.emeraldpay.dshackle.upstream.ForkWatchFactory
import io.emeraldpay.dshackle.upstream.Head
-import io.emeraldpay.dshackle.upstream.MergedHead
+import io.emeraldpay.dshackle.upstream.MergedPowHead
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinCacheUpdate
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinRpcHead
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinRpcUpstream
@@ -188,7 +188,7 @@ open class ConfiguredUpstreams(
val head: Head = conn.zeroMq?.let { zeroMq ->
val server = ZMQServer(zeroMq.host, zeroMq.port, "hashblock")
val zeroMqHead = BitcoinZMQHead(server, directApi, extractBlock)
- MergedHead(listOf(rpcHead, zeroMqHead))
+ MergedPowHead(listOf(rpcHead, zeroMqHead))
} ?: rpcHead
val subscriptions = conn.zeroMq?.let { zeroMq ->
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedHead.kt
deleted file mode 100644
index 57a65ca1e..000000000
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedHead.kt
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Copyright (c) 2020 EmeraldPay, Inc
- * Copyright (c) 2019 ETCDEV GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.emeraldpay.dshackle.upstream
-
-import io.emeraldpay.dshackle.cache.Caches
-import io.emeraldpay.dshackle.cache.CachesEnabled
-import org.springframework.context.Lifecycle
-import reactor.core.Disposable
-import reactor.core.publisher.Flux
-
-class MergedHead(
- private val sources: Iterable
-) : AbstractHead(), Lifecycle, CachesEnabled {
-
- private var subscription: Disposable? = null
-
- override fun isRunning(): Boolean {
- return subscription != null
- }
-
- override fun start() {
- sources.forEach { head ->
- if (head is Lifecycle && !head.isRunning) {
- head.start()
- }
- }
- subscription?.dispose()
- subscription = super.follow(Flux.merge(sources.map { it.getFlux() }))
- }
-
- override fun stop() {
- sources.forEach { head ->
- if (head is Lifecycle && head.isRunning) {
- head.stop()
- }
- }
- subscription?.dispose()
- subscription = null
- }
-
- override fun setCaches(caches: Caches) {
- sources.forEach {
- if (it is CachesEnabled) {
- it.setCaches(caches)
- }
- }
- }
-}
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedPosHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedPosHead.kt
new file mode 100644
index 000000000..0f93113a1
--- /dev/null
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedPosHead.kt
@@ -0,0 +1,128 @@
+package io.emeraldpay.dshackle.upstream
+
+import io.emeraldpay.dshackle.cache.Caches
+import io.emeraldpay.dshackle.cache.CachesEnabled
+import io.emeraldpay.dshackle.data.BlockContainer
+import org.slf4j.LoggerFactory
+import org.springframework.context.Lifecycle
+import reactor.core.Disposable
+import reactor.core.publisher.Flux
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.function.Function
+import kotlin.concurrent.read
+import kotlin.concurrent.write
+
+class MergedPosHead(
+ private val sources: Iterable>
+) : AbstractHead(), Lifecycle, CachesEnabled {
+
+ companion object {
+ private val log = LoggerFactory.getLogger(MergedPosHead::class.java)
+ }
+ private var subscription: Disposable? = null
+
+ private val lock = ReentrantReadWriteLock()
+ private val headLimit = 16
+ private var head: List> = emptyList()
+
+ override fun isRunning(): Boolean {
+ return subscription != null
+ }
+
+ override fun start() {
+ sources.forEach {
+ val head = it.second
+ if (head is Lifecycle && !head.isRunning) {
+ head.start()
+ }
+ }
+ subscription?.dispose()
+ subscription = super.follow(merge(sources.map { Pair(it.first, it.second.getFlux()) }))
+ }
+
+ fun merge(sources: Iterable>>): Flux {
+ return Flux.merge(
+ sources.map {
+ it.second.transform(process(it.first))
+ }
+ ).distinctUntilChanged {
+ it.hash
+ }
+ }
+
+ fun process(priority: Int): Function, Flux> {
+ return Function { source ->
+ source.handle { block, sink ->
+ if (onNext(priority, block)) {
+ val top = lock.read {
+ head.lastOrNull()
+ }
+ if (top != null) {
+ sink.next(top.second)
+ }
+ }
+ }
+ }
+ }
+
+ private fun onNext(priority: Int, block: BlockContainer): Boolean {
+ val prev = lock.read {
+ head.find { it.second.height == block.height }
+ }
+ if (prev != null && prev.first > priority) {
+ return false
+ }
+ lock.write {
+ // first, check if existing data for the height is better
+ val prev = head.find { it.second.height == block.height }
+ if (prev != null && prev.first > priority) {
+ return false
+ }
+
+ // otherwise add it to the list
+ val fresh = if (head.isEmpty()) {
+ // just the first run, so nothing to do yet
+ listOf(Pair(priority, block))
+ } else if (head.last().second.height < block.height) {
+ // new block, just add it on top
+ head + Pair(priority, block)
+ } else if (head.all { it.first < priority }) {
+ // filled with low priority upstream that may be invalid, so replace the whole list
+ listOf(Pair(priority, block))
+ } else {
+ // situation when we have that block in the list and since we did the checks above it can have only a lower priority
+ // now there are two options: the same block or different block.
+ // if it's in the middle keep the rest anyway b/c a higher priority upstream would fix it with the following updates
+ head.map {
+ if (it.second.height == block.height) {
+ Pair(priority, block)
+ } else {
+ it
+ }
+ }
+ }
+ head = fresh.takeLast(headLimit)
+ return true
+ }
+ }
+
+ override fun stop() {
+ sources.forEach {
+ val head = it.second
+ if (head is Lifecycle && head.isRunning) {
+ head.stop()
+ }
+ }
+ subscription?.dispose()
+ subscription = null
+ }
+
+ override fun setCaches(caches: Caches) {
+ sources.forEach {
+ val head = it.second
+ if (head is CachesEnabled) {
+ head.setCaches(caches)
+ }
+ }
+ }
+}
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedPowHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedPowHead.kt
new file mode 100644
index 000000000..ab9f44318
--- /dev/null
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedPowHead.kt
@@ -0,0 +1,137 @@
+/**
+ * Copyright (c) 2020 EmeraldPay, Inc
+ * Copyright (c) 2019 ETCDEV GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.emeraldpay.dshackle.upstream
+
+import io.emeraldpay.dshackle.cache.Caches
+import io.emeraldpay.dshackle.cache.CachesEnabled
+import io.emeraldpay.dshackle.data.BlockContainer
+import org.springframework.context.Lifecycle
+import reactor.core.Disposable
+import reactor.core.publisher.Flux
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.function.Function
+import kotlin.concurrent.read
+import kotlin.concurrent.write
+
+class MergedPowHead(
+ private val sources: Iterable
+) : AbstractHead(), Lifecycle, CachesEnabled {
+
+ private var subscription: Disposable? = null
+
+ private val lock = ReentrantReadWriteLock()
+ private val headLimit = 16
+ private var head: List = emptyList()
+
+ override fun isRunning(): Boolean {
+ return subscription != null
+ }
+
+ override fun start() {
+ sources.forEach { head ->
+ if (head is Lifecycle && !head.isRunning) {
+ head.start()
+ }
+ }
+ subscription?.dispose()
+ subscription = super.follow(merge(sources.map { it.getFlux() }))
+ }
+
+ fun merge(sources: Iterable>): Flux {
+ return Flux.merge(
+ sources.map {
+ it.transform(process())
+ }
+ ).distinctUntilChanged {
+ it.hash
+ }
+ }
+
+ fun process(): Function, Flux> {
+ return Function { source ->
+ source.handle { block, sink ->
+ if (onNext(block)) {
+ val top = lock.read {
+ head.lastOrNull()
+ }
+ if (top != null) {
+ sink.next(top)
+ }
+ }
+ }
+ }
+ }
+
+ private fun onNext(block: BlockContainer): Boolean {
+ val prev = lock.read {
+ head.find { it.height == block.height }
+ }
+ if (prev != null && prev.difficulty > block.difficulty) {
+ return false
+ }
+ lock.write {
+ // first, check if existing data for the height is better
+ val prev = head.find { it.height == block.height }
+ if (prev != null && prev.difficulty > block.difficulty) {
+ return false
+ }
+
+ // otherwise add it to the list
+ val fresh = if (head.isEmpty()) {
+ // just the first run, so nothing to do yet
+ listOf(block)
+ } else if (head.last().height < block.height) {
+ // new block, just add it on top
+ head + block
+ } else {
+ // situation when we have that block in the list and since we checked it above it has a lower priority
+ // now there are two options: the same block or different block.
+ // if it's in the middle keep the rest anyway b/c a higher priority upstream would fix it with the following updates
+ head.map {
+ if (it.height == block.height) {
+ block
+ } else {
+ it
+ }
+ }
+ }
+ head = fresh
+ // drop all blocks on top of this one if their difficulty is lower
+ .filterNot { it.height > block.height && it.difficulty < block.difficulty }
+ .takeLast(headLimit)
+ return true
+ }
+ }
+
+ override fun stop() {
+ sources.forEach { head ->
+ if (head is Lifecycle && head.isRunning) {
+ head.stop()
+ }
+ }
+ subscription?.dispose()
+ subscription = null
+ }
+
+ override fun setCaches(caches: Caches) {
+ sources.forEach {
+ if (it is CachesEnabled) {
+ it.setCaches(caches)
+ }
+ }
+ }
+}
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt
index 72c0edcaa..eb5ede34f 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt
@@ -26,7 +26,7 @@ import io.emeraldpay.dshackle.upstream.EmptyHead
import io.emeraldpay.dshackle.upstream.HardcodedReader
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IntegralRpcReader
-import io.emeraldpay.dshackle.upstream.MergedHead
+import io.emeraldpay.dshackle.upstream.MergedPowHead
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.VerifyingReader
@@ -104,7 +104,7 @@ open class BitcoinMultistream(
}
}
} else {
- val newHead = MergedHead(sourceUpstreams.map { it.getHead() }).apply {
+ val newHead = MergedPowHead(sourceUpstreams.map { it.getHead() }).apply {
this.start()
}
val lagObserver = BitcoinHeadLagObserver(newHead, sourceUpstreams)
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt
index c6b311aa0..5fddf3182 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt
@@ -25,7 +25,8 @@ import io.emeraldpay.dshackle.upstream.EmptyHead
import io.emeraldpay.dshackle.upstream.HardcodedReader
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IntegralRpcReader
-import io.emeraldpay.dshackle.upstream.MergedHead
+import io.emeraldpay.dshackle.upstream.MergedPosHead
+import io.emeraldpay.dshackle.upstream.MergedPowHead
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.VerifyingReader
@@ -72,6 +73,11 @@ open class EthereumMultistream(
else -> false
}
+ private val isPos = when (chain) {
+ Chain.ETHEREUM, Chain.TESTNET_GOERLI, Chain.TESTNET_HOLESKY, Chain.TESTNET_SEPOLIA -> true
+ else -> false
+ }
+
private val feeEstimation = if (supportsEIP1559) {
EthereumPriorityFees(this, dataReaders, 256)
} else {
@@ -131,8 +137,13 @@ open class EthereumMultistream(
}
}
} else {
- val heads = upstreams.map { it.getHead() }
- val newHead = MergedHead(heads).apply {
+ val newHead = if (isPos) {
+ val heads = upstreams.map { Pair(it.getOptions().priority, it.getHead()) }
+ MergedPosHead(heads)
+ } else {
+ val heads = upstreams.map { it.getHead() }
+ MergedPowHead(heads)
+ }.apply {
this.start()
}
val lagObserver = EthereumHeadLagObserver(newHead, upstreams as Collection)
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumRpcUpstream.kt
index 8fb7891ce..2bbcb707b 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumRpcUpstream.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumRpcUpstream.kt
@@ -8,7 +8,7 @@ import io.emeraldpay.dshackle.reader.StandardRpcReader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.ForkWatch
import io.emeraldpay.dshackle.upstream.Head
-import io.emeraldpay.dshackle.upstream.MergedHead
+import io.emeraldpay.dshackle.upstream.MergedPowHead
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.calls.CallMethods
@@ -106,7 +106,7 @@ open class EthereumRpcUpstream(
val rpcHead = EthereumRpcHead(chain, getIngressReader(), Duration.ofSeconds(60)).apply {
start()
}
- MergedHead(listOf(rpcHead, wsHead)).apply {
+ MergedPowHead(listOf(rpcHead, wsHead)).apply {
start()
}
} else {
diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/MergedHeadSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/MergedHeadSpec.groovy
deleted file mode 100644
index 217f2f740..000000000
--- a/src/test/groovy/io/emeraldpay/dshackle/upstream/MergedHeadSpec.groovy
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Copyright (c) 2021 EmeraldPay, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.emeraldpay.dshackle.upstream
-
-import org.springframework.context.Lifecycle
-import reactor.core.publisher.Flux
-import spock.lang.Specification
-
-class MergedHeadSpec extends Specification {
-
- def "ensures that heads are running on start"() {
- setup:
- def head1 = Stub(TestHead1) {
- _ * getFlux() >> Flux.empty()
- }
- def head2 = Mock(TestHead2) {
- _ * isRunning() >> true
- _ * getFlux() >> Flux.empty()
- }
- def head3 = Mock(TestHead2) {
- _ * isRunning() >> false
- _ * getFlux() >> Flux.empty()
- }
-
- when:
- def merged = new MergedHead([head1, head2, head3])
- merged.start()
-
- then:
- 1 * head3.start()
- }
-
- class TestHead1 extends AbstractHead {
-
- }
-
- class TestHead2 extends AbstractHead implements Lifecycle {
-
- @Override
- void start() {
-
- }
-
- @Override
- void stop() {
-
- }
-
- @Override
- boolean isRunning() {
- return false
- }
- }
-}
diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/DefaultEthereumHeadSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/DefaultEthereumHeadSpec.groovy
deleted file mode 100644
index 4c842390c..000000000
--- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/DefaultEthereumHeadSpec.groovy
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Copyright (c) 2019 ETCDEV GmbH
- * Copyright (c) 2020 EmeraldPay, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.emeraldpay.dshackle.upstream.ethereum
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import io.emeraldpay.dshackle.Global
-import io.emeraldpay.dshackle.data.BlockContainer
-import io.emeraldpay.dshackle.test.TestingCommons
-import io.emeraldpay.etherjar.domain.BlockHash
-import io.emeraldpay.etherjar.rpc.json.BlockJson
-import io.emeraldpay.api.Chain
-import reactor.core.publisher.Flux
-import reactor.test.StepVerifier
-import spock.lang.Specification
-
-import java.time.Instant
-
-class DefaultEthereumHeadSpec extends Specification {
-
- DefaultEthereumHead head = new DefaultEthereumHead(Chain.ETHEREUM)
- ObjectMapper objectMapper = Global.objectMapper
-
- def blocks = (10L..20L).collect { i ->
- BlockContainer.from(
- new BlockJson().tap {
- it.number = 10000L + i
- it.hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec89152" + i)
- it.totalDifficulty = 11 * i
- it.timestamp = Instant.now()
- })
- }
-
- def "Starts to follow"() {
- when:
- head.follow(Flux.just(blocks[0]))
- def act = head.flux
- then:
- StepVerifier.create(act)
- .expectNext(blocks[0])
- .expectComplete()
- }
-
- def "Follows normal order"() {
- when:
- head.follow(Flux.just(blocks[0], blocks[1], blocks[3]))
- def act = head.flux
- then:
- StepVerifier.create(act)
- .expectNext(blocks[0])
- .expectNext(blocks[1])
- .expectNext(blocks[3])
- .expectComplete()
- }
-
- def "Ignores old"() {
- when:
- head.follow(Flux.just(blocks[0], blocks[3], blocks[1]))
- def act = head.flux
- then:
- StepVerifier.create(act)
- .expectNext(blocks[0])
- .expectNext(blocks[3])
- .expectComplete()
- }
-
- def "Ignores repeating"() {
- when:
- head.follow(Flux.just(blocks[0], blocks[3], blocks[3], blocks[3], blocks[2], blocks[3]))
- def act = head.flux
- then:
- StepVerifier.create(act)
- .expectNext(blocks[0])
- .expectNext(blocks[3])
- .expectComplete()
- }
-
- def "Ignores less difficult"() {
- when:
- def block3less = BlockContainer.from(
- new BlockJson().tap {
- it.number = blocks[3].height
- it.hash = BlockHash.from(blocks[3].hash.value)
- it.totalDifficulty = blocks[3].difficulty - 1
- it.timestamp = Instant.now()
- })
- head.follow(Flux.just(blocks[0], blocks[3], block3less))
- def act = head.flux
- then:
- StepVerifier.create(act)
- .expectNext(blocks[0])
- .expectNext(blocks[3])
- .expectComplete()
- }
-
- def "Replaces with more difficult"() {
- when:
- def block3less = BlockContainer.from(
- new BlockJson().tap {
- it.number = blocks[3].height
- it.hash = BlockHash.from(blocks[3].hash.value)
- it.totalDifficulty = blocks[3].difficulty + 1
- it.timestamp = Instant.now()
- })
- head.follow(Flux.just(blocks[0], blocks[3], block3less))
- def act = head.flux
- then:
- StepVerifier.create(act)
- .expectNext(blocks[0])
- .expectNext(block3less)
- .expectComplete()
- }
-}
diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/MergedPosHeadTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/MergedPosHeadTest.kt
new file mode 100644
index 000000000..d937b2cf1
--- /dev/null
+++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/MergedPosHeadTest.kt
@@ -0,0 +1,120 @@
+package io.emeraldpay.dshackle.upstream
+
+import io.emeraldpay.dshackle.data.BlockContainer
+import io.emeraldpay.etherjar.domain.BlockHash
+import io.emeraldpay.etherjar.rpc.json.BlockJson
+import io.emeraldpay.etherjar.rpc.json.TransactionRefJson
+import io.kotest.core.spec.style.ShouldSpec
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Sinks
+import reactor.test.StepVerifier
+import java.math.BigInteger
+import java.time.Duration
+import java.time.Instant
+
+class MergedPosHeadTest : ShouldSpec({
+
+ val blocks = (10L..20L).map { i ->
+ BlockContainer.from(
+ BlockJson().apply {
+ number = 10000L + i
+ hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec81111" + i)
+ totalDifficulty = BigInteger.ONE
+ timestamp = Instant.now()
+ }
+ )
+ }
+
+ val blocksOther = (10L..20L).map { i ->
+ BlockContainer.from(
+ BlockJson().apply {
+ number = 10000L + i
+ hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec82222" + i)
+ totalDifficulty = BigInteger.ONE
+ timestamp = Instant.now()
+ }
+ )
+ }
+
+ should("Follow normal order") {
+ val head = MergedPosHead(emptyList())
+ val act = head.merge(
+ listOf(
+ Pair(1, Flux.just(blocks[0], blocks[1], blocks[3]).delayElements(Duration.ofMillis(10))),
+ Pair(2, Flux.just(blocks[0], blocks[1], blocks[3]).delayElements(Duration.ofMillis(10)))
+ )
+ )
+ .take(Duration.ofMillis(500))
+
+ StepVerifier.create(act)
+ .expectNext(blocks[0])
+ .expectNext(blocks[1])
+ .expectNext(blocks[3])
+ .expectComplete()
+ .verify(Duration.ofSeconds(1))
+ }
+
+ should("Keep higher priority") {
+ val head = MergedPosHead(emptyList())
+
+ val blocks1 = Sinks.many().unicast().onBackpressureBuffer()
+ val blocks2 = Sinks.many().unicast().onBackpressureBuffer()
+ val stream = head.merge(
+ listOf(
+ Pair(2, blocks1.asFlux()),
+ Pair(1, blocks2.asFlux())
+ )
+ )
+ .take(Duration.ofMillis(500))
+
+ StepVerifier.create(stream)
+ .then {
+ blocks1.tryEmitNext(blocks[0])
+ blocks2.tryEmitNext(blocksOther[0])
+ }
+ .expectNext(blocks[0])
+ .then {
+ blocks2.tryEmitNext(blocks[1])
+ blocks1.tryEmitNext(blocks[1])
+ }
+ .expectNext(blocks[1])
+ .then {
+ blocks1.tryEmitNext(blocks[2])
+ blocks2.tryEmitNext(blocksOther[2])
+ }
+ .expectNext(blocks[2])
+ .expectComplete()
+ .verify(Duration.ofSeconds(1))
+ }
+
+ should("Restart if filled with low priority") {
+ val head = MergedPosHead(emptyList())
+
+ val blocks1 = Sinks.many().unicast().onBackpressureBuffer()
+ val blocks2 = Sinks.many().unicast().onBackpressureBuffer()
+ val stream = head.merge(
+ listOf(
+ Pair(2, blocks1.asFlux()),
+ Pair(1, blocks2.asFlux())
+ )
+ )
+ .take(Duration.ofMillis(500))
+
+ StepVerifier.create(stream)
+ .then {
+ blocks2.tryEmitNext(blocksOther[3])
+ }
+ .expectNext(blocksOther[3])
+ .then {
+ blocks2.tryEmitNext(blocksOther[4])
+ }
+ .expectNext(blocksOther[4])
+ .then {
+ blocks1.tryEmitNext(blocks[2])
+ }
+ // a block from the high priority source
+ .expectNext(blocks[2])
+ .expectComplete()
+ .verify(Duration.ofSeconds(1))
+ }
+})
diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/MergedPowHeadTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/MergedPowHeadTest.kt
new file mode 100644
index 000000000..34cfc5c50
--- /dev/null
+++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/MergedPowHeadTest.kt
@@ -0,0 +1,134 @@
+package io.emeraldpay.dshackle.upstream
+
+import io.emeraldpay.dshackle.data.BlockContainer
+import io.emeraldpay.etherjar.domain.BlockHash
+import io.emeraldpay.etherjar.rpc.json.BlockJson
+import io.emeraldpay.etherjar.rpc.json.TransactionRefJson
+import io.kotest.core.spec.style.ShouldSpec
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.verify
+import org.springframework.context.Lifecycle
+import reactor.core.publisher.Flux
+import reactor.test.StepVerifier
+import java.math.BigInteger
+import java.time.Duration
+import java.time.Instant
+
+class MergedPowHeadTest : ShouldSpec({
+
+ val blocks = (10L..20L).map { i ->
+ BlockContainer.from(
+ BlockJson().apply {
+ number = 10000L + i
+ hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec81111" + i)
+ totalDifficulty = BigInteger.valueOf(11 * i)
+ timestamp = Instant.now()
+ }
+ )
+ }
+
+ should("Start delegates") {
+ val head1 = mockk(relaxed = true)
+ val head2 = mockk()
+ val head = MergedPowHead(listOf(head1, head2))
+
+ every { head1.isRunning } returns false
+ every { head1.getFlux() } returns Flux.never()
+ every { head2.getFlux() } returns Flux.never()
+
+ head.start()
+
+ verify { head1.start() }
+ }
+
+ should("Follow") {
+ val head = MergedPowHead(emptyList())
+
+ val act = head.merge(listOf(Flux.just(blocks[0])))
+ .take(Duration.ofMillis(500))
+
+ StepVerifier.create(act)
+ .expectNext(blocks[0])
+ .expectComplete()
+ .verify(Duration.ofSeconds(1))
+ }
+
+ should("Follow normal order") {
+ val head = MergedPowHead(emptyList())
+ val act = head.merge(listOf(Flux.just(blocks[0], blocks[1], blocks[3])))
+ .take(Duration.ofMillis(500))
+
+ StepVerifier.create(act)
+ .expectNext(blocks[0])
+ .expectNext(blocks[1])
+ .expectNext(blocks[3])
+ .expectComplete()
+ .verify(Duration.ofSeconds(100))
+ }
+
+ should("Ignore old") {
+ val head = MergedPowHead(emptyList())
+ val act = head.merge(listOf(Flux.just(blocks[0], blocks[3], blocks[1])))
+ .take(Duration.ofMillis(500))
+ StepVerifier.create(act)
+ .expectNext(blocks[0])
+ .expectNext(blocks[3])
+ .expectComplete()
+ .verify(Duration.ofSeconds(1))
+ }
+
+ should("Ignore repeating") {
+ val head = MergedPowHead(emptyList())
+ val act = head.merge(listOf(Flux.just(blocks[0], blocks[3], blocks[3], blocks[3], blocks[2], blocks[3])))
+ .take(Duration.ofMillis(500))
+ StepVerifier.create(act)
+ .expectNext(blocks[0])
+ .expectNext(blocks[3])
+ .expectComplete()
+ .verify(Duration.ofSeconds(1))
+ }
+
+ should("Ignore less difficult") {
+ val block3less = BlockContainer.from(
+ BlockJson().apply {
+ number = blocks[3].height
+ hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8000000")
+ totalDifficulty = blocks[3].difficulty - BigInteger.ONE
+ timestamp = Instant.now()
+ }
+ )
+
+ val head = MergedPowHead(emptyList())
+ val act = head.merge(listOf(Flux.just(blocks[0], blocks[3], block3less)))
+ .take(Duration.ofMillis(500))
+ StepVerifier.create(act)
+ .expectNext(blocks[0])
+ .expectNext(blocks[3])
+ .expectComplete()
+ .verify(Duration.ofSeconds(1))
+ }
+
+ should("Replace with more difficult") {
+ val head = MergedPowHead(emptyList())
+ val block3less = BlockContainer.from(
+ BlockJson().apply {
+ number = blocks[3].height
+ hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8000000")
+ totalDifficulty = blocks[3].difficulty + BigInteger.ONE
+ timestamp = Instant.now()
+ }
+ )
+
+ val act = head.merge(listOf(Flux.just(blocks[0], blocks[3], block3less)))
+ .take(Duration.ofMillis(500))
+ StepVerifier.create(act)
+ .expectNext(blocks[0])
+ .expectNext(blocks[3])
+ .expectNext(block3less)
+ .expectComplete()
+ .verify(Duration.ofSeconds(1))
+ }
+})
+
+interface StartableHead : Head, Lifecycle