Skip to content

Commit

Permalink
add build info to upstream (emeraldpay#162)
Browse files Browse the repository at this point in the history
* add build info to upstream

* fix tests

* add build info to node status subscription

* move proto field

* update proto submodule to master
  • Loading branch information
Vadim Vlasov authored Mar 20, 2023
1 parent 32c689c commit a3a89ab
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 23 deletions.
2 changes: 2 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/Describe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.emeraldpay.dshackle.rpc

import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.Common
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.DefaultUpstream
Expand All @@ -35,6 +36,7 @@ class Describe(
fun describe(requestMono: Mono<BlockchainOuterClass.DescribeRequest>): Mono<BlockchainOuterClass.DescribeResponse> {
return requestMono.map { _ ->
val resp = BlockchainOuterClass.DescribeResponse.newBuilder()
resp.buildInfoBuilder.version = Global.version
multistreamHolder.getAvailable().forEach { chain ->
multistreamHolder.getUpstream(chain).let { chainUpstreams ->
val status = subscribeStatus.chainStatus(chain, chainUpstreams.getStatus(), chainUpstreams)
Expand Down
12 changes: 10 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.emeraldpay.dshackle.upstream.CurrentMultistreamHolder
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.grpc.GrpcUpstream
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -118,8 +119,8 @@ class SubscribeNodeStatus(
return Flux.concat(currentState, statuses)
}

private fun buildDescription(ms: Multistream, up: Upstream): NodeDescription.Builder =
NodeDescription.newBuilder()
private fun buildDescription(ms: Multistream, up: Upstream): NodeDescription.Builder {
val builder = NodeDescription.newBuilder()
.setChain(Common.ChainRef.forNumber(ms.chain.id))
.setNodeId(up.nodeId().toInt())
.addAllNodeLabels(
Expand All @@ -138,6 +139,13 @@ class SubscribeNodeStatus(
)
.addAllSupportedSubscriptions(ms.getEgressSubscription().getAvailableTopics())
.addAllSupportedMethods(up.getMethods().getSupportedMethods())
(up as? GrpcUpstream)?.let {
it.getBuildInfo().version?.let { version ->
builder.nodeBuildInfoBuilder.setVersion(version)
}
}
return builder
}

private fun buildStatus(status: UpstreamAvailability, height: Long?): NodeStatus.Builder =
NodeStatus.newBuilder()
Expand Down
17 changes: 17 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/BuildInfo.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.api.proto.BlockchainOuterClass

data class BuildInfo(var version: String? = null) {
fun update(buildInfo: BuildInfo): Boolean {
val changed = buildInfo.version != version
version = buildInfo.version
return changed
}

companion object {
fun extract(buildInfo: BlockchainOuterClass.BuildInfo): BuildInfo {
return BuildInfo(buildInfo.version)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.data.BlockId
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.BuildInfo
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Lifecycle
Expand Down Expand Up @@ -103,6 +104,7 @@ class BitcoinGrpcUpstream(
private val grpcHead = GrpcHead(getId(), chain, this, remote, blockConverter, reloadBlock, MostWorkForkChoice())
private val timeout = Defaults.timeout
private var capabilities: Set<Capability> = emptySet()
private val buildInfo: BuildInfo = BuildInfo()

override fun getBlockchainApi(): ReactorBlockchainGrpc.ReactorBlockchainStub {
return remote
Expand Down Expand Up @@ -149,11 +151,18 @@ class BitcoinGrpcUpstream(
override fun stop() {
}

override fun update(conf: BlockchainOuterClass.DescribeChain): Boolean {
override fun getBuildInfo(): BuildInfo {
return buildInfo
}

override fun update(conf: BlockchainOuterClass.DescribeChain, buildInfo: BlockchainOuterClass.BuildInfo): Boolean {
val newBuildInfo = BuildInfo.extract(buildInfo)
val buildInfoChanged = this.buildInfo.update(newBuildInfo)
val newCapabilities = RemoteCapabilities.extract(conf)
conf.status?.let { status -> onStatus(status) }
return (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also {
val upstreamStatusChanged = (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also {
capabilities = newCapabilities
}
return buildInfoChanged || upstreamStatusChanged
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.data.BlockId
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.BuildInfo
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Lifecycle
Expand Down Expand Up @@ -109,6 +110,7 @@ open class EthereumGrpcUpstream(
private val upstreamStatus = GrpcUpstreamStatus(overrideLabels)
private val grpcHead = GrpcHead(getId(), chain, this, remote, blockConverter, reloadBlock, MostWorkForkChoice())
private var capabilities: Set<Capability> = emptySet()
private val buildInfo: BuildInfo = BuildInfo()

private val defaultReader: JsonRpcReader = client.getReader()
var timeout = Defaults.timeout
Expand All @@ -131,12 +133,19 @@ open class EthereumGrpcUpstream(
override fun stop() {
}

override fun update(conf: BlockchainOuterClass.DescribeChain): Boolean {
override fun getBuildInfo(): BuildInfo {
return buildInfo
}

override fun update(conf: BlockchainOuterClass.DescribeChain, buildInfo: BlockchainOuterClass.BuildInfo): Boolean {
val newBuildInfo = BuildInfo.extract(buildInfo)
val buildInfoChanged = this.buildInfo.update(newBuildInfo)
val newCapabilities = RemoteCapabilities.extract(conf)
conf.status?.let { status -> onStatus(status) }
return (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also {
val upstreamStatusChanged = (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also {
capabilities = newCapabilities
}
return buildInfoChanged || upstreamStatusChanged
}

override fun getQuorumByLabel(): QuorumForLabels {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.data.BlockId
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.BuildInfo
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Lifecycle
Expand Down Expand Up @@ -81,6 +82,7 @@ open class EthereumPosGrpcUpstream(
private val upstreamStatus = GrpcUpstreamStatus(overrideLabels)
private val grpcHead = GrpcHead(getId(), chain, this, remote, blockConverter, null, NoChoiceWithPriorityForkChoice(nodeRating, parentId))
private var capabilities: Set<Capability> = emptySet()
private val buildInfo: BuildInfo = BuildInfo()

private val defaultReader: JsonRpcReader = client.getReader()
private val timeout = Defaults.timeout
Expand All @@ -96,12 +98,19 @@ open class EthereumPosGrpcUpstream(
override fun stop() {
}

override fun update(conf: BlockchainOuterClass.DescribeChain): Boolean {
override fun getBuildInfo(): BuildInfo {
return buildInfo
}

override fun update(conf: BlockchainOuterClass.DescribeChain, buildInfo: BlockchainOuterClass.BuildInfo): Boolean {
val newBuildInfo = BuildInfo.extract(buildInfo)
val buildInfoChanged = this.buildInfo.update(newBuildInfo)
val newCapabilities = RemoteCapabilities.extract(conf)
conf.status?.let { status -> onStatus(status) }
return (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also {
val upstreamStatusChanged = (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also {
capabilities = newCapabilities
}
return buildInfoChanged || upstreamStatusChanged
}

override fun getQuorumByLabel(): QuorumForLabels {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.emeraldpay.dshackle.upstream.grpc
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.BlockchainOuterClass.NativeSubscribeRequest
import io.emeraldpay.api.proto.ReactorBlockchainGrpc
import io.emeraldpay.dshackle.upstream.BuildInfo
import io.emeraldpay.dshackle.upstream.Upstream
import reactor.core.publisher.Flux

Expand All @@ -27,7 +28,9 @@ interface GrpcUpstream : Upstream {
* Update the configuration of the upstream with the new data.
* Called on the first creation, and each time a new state received from upstream
*/
fun update(conf: BlockchainOuterClass.DescribeChain): Boolean
fun update(conf: BlockchainOuterClass.DescribeChain, buildInfo: BlockchainOuterClass.BuildInfo): Boolean

fun getBuildInfo(): BuildInfo

fun getBlockchainApi(): ReactorBlockchainGrpc.ReactorBlockchainStub

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,16 @@ class GrpcUpstreams(
}

private fun processDescription(value: DescribeResponse): Flux<UpstreamChangeEvent> {
log.info("Start processing grpc upstream description for $id with chains ${value.chainsList.map { it.chain.name }}")
val chainNames = value.chainsList.map { it.chain.name }
val version = value.buildInfo.version
log.info("Start processing grpc upstream description for $id with chains $chainNames and version $version")
val current = value.chainsList.filter {
Chain.byId(it.chain.number) != Chain.UNSPECIFIED
}.mapNotNull { chainDetails ->
try {
val chain = Chain.byId(chainDetails.chain.number)
val up = getOrCreate(chain)
val changed = (up.upstream as GrpcUpstream).update(chainDetails)
val changed = (up.upstream as GrpcUpstream).update(chainDetails, value.buildInfo)
up.takeUnless {
changed && it.type == UpstreamChangeEvent.ChangeType.REVALIDATED
} ?: UpstreamChangeEvent(up.chain, up.upstream, UpstreamChangeEvent.ChangeType.UPDATED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.data.BlockId
import io.emeraldpay.dshackle.test.MockGrpcServer
import io.emeraldpay.dshackle.test.TestingCommons
import io.emeraldpay.dshackle.upstream.BuildInfo
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient
import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics
Expand All @@ -52,6 +53,7 @@ class EthereumGrpcUpstreamSpec extends Specification {
)

def hash = (byte)123
def buildInfo = new BuildInfo("v0.0.1-test")

def "Subscribe to head"() {
setup:
Expand Down Expand Up @@ -86,16 +88,22 @@ class EthereumGrpcUpstreamSpec extends Specification {
})
def upstream = new EthereumGrpcUpstream("test", hash, UpstreamsConfig.UpstreamRole.PRIMARY, chain, client, new JsonRpcGrpcClient(client, chain, metrics), null, ChainsConfig.ChainConfig.default())
upstream.setLag(0)
upstream.update(BlockchainOuterClass.DescribeChain.newBuilder()
.setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId))
.addAllSupportedMethods(["eth_getBlockByHash"])
.build())
upstream.update(
BlockchainOuterClass.DescribeChain.newBuilder()
.setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId))
.addAllSupportedMethods(["eth_getBlockByHash"])
.build(),
BlockchainOuterClass.BuildInfo.newBuilder()
.setVersion(buildInfo.version)
.build(),
)
when:
new Thread({ Thread.sleep(50); upstream.head.start() }).start()
def h = upstream.head.getFlux().next().block(Duration.ofSeconds(1))
then:
callData.chain == Chain.ETHEREUM.id
upstream.status == UpstreamAvailability.OK
upstream.getBuildInfo() == buildInfo
h.hash == BlockId.from("0x50d26e119968e791970d84a7bf5d0ec474d3ec2ef85d5ec8915210ac6bc09ad7")
}

Expand Down Expand Up @@ -144,15 +152,21 @@ class EthereumGrpcUpstreamSpec extends Specification {
})
def upstream = new EthereumGrpcUpstream("test", hash, UpstreamsConfig.UpstreamRole.PRIMARY, Chain.ETHEREUM, client, new JsonRpcGrpcClient(client, Chain.ETHEREUM, metrics), null, ChainsConfig.ChainConfig.default())
upstream.setLag(0)
upstream.update(BlockchainOuterClass.DescribeChain.newBuilder()
.setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId))
.addAllSupportedMethods(["eth_getBlockByHash"])
.build())
upstream.update(
BlockchainOuterClass.DescribeChain.newBuilder()
.setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId))
.addAllSupportedMethods(["eth_getBlockByHash"])
.build(),
BlockchainOuterClass.BuildInfo.newBuilder()
.setVersion(buildInfo.version)
.build(),
)
when:
new Thread({ Thread.sleep(50); upstream.head.start() }).start()
def h = upstream.head.getFlux().take(Duration.ofSeconds(1)).last().block(Duration.ofSeconds(2))
then:
upstream.status == UpstreamAvailability.OK
upstream.getBuildInfo() == buildInfo
h.hash == BlockId.from("0x50d26e119968e791970d84a7bf5d0ec474d3ec2ef85d5ec8915210ac6bc09ad7")
h.height == 650246
}
Expand Down Expand Up @@ -206,16 +220,22 @@ class EthereumGrpcUpstreamSpec extends Specification {
})
def upstream = new EthereumGrpcUpstream("test", hash, UpstreamsConfig.UpstreamRole.PRIMARY, chain, client, new JsonRpcGrpcClient(client, chain, metrics), null, ChainsConfig.ChainConfig.default())
upstream.setLag(0)
upstream.update(BlockchainOuterClass.DescribeChain.newBuilder()
.setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId))
.addAllSupportedMethods(["eth_getBlockByHash"])
.build())
upstream.update(
BlockchainOuterClass.DescribeChain.newBuilder()
.setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId))
.addAllSupportedMethods(["eth_getBlockByHash"])
.build(),
BlockchainOuterClass.BuildInfo.newBuilder()
.setVersion(buildInfo.version)
.build(),
)
when:
new Thread({ Thread.sleep(50); upstream.head.start() }).start()
finished.get()
def h = upstream.head.getFlux().take(Duration.ofSeconds(1)).last().block(Duration.ofSeconds(2))
then:
upstream.status == UpstreamAvailability.OK
upstream.getBuildInfo() == buildInfo
h.hash == BlockId.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec891521a")
h.height == 650247
}
Expand Down

0 comments on commit a3a89ab

Please sign in to comment.