Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

solution: newPendingTransactions subscription #191

Merged
merged 2 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/Global.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import io.emeraldpay.dshackle.upstream.bitcoin.data.EsploraUnspent
import io.emeraldpay.dshackle.upstream.bitcoin.data.EsploraUnspentDeserializer
import io.emeraldpay.dshackle.upstream.bitcoin.data.RpcUnspent
import io.emeraldpay.dshackle.upstream.bitcoin.data.RpcUnspentDeserializer
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.TransactionIdSerializer
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.emeraldpay.etherjar.domain.TransactionId
import io.emeraldpay.grpc.Chain
import java.text.SimpleDateFormat
import java.util.Locale
Expand Down Expand Up @@ -79,6 +81,7 @@ class Global {
private fun createObjectMapper(): ObjectMapper {
val module = SimpleModule("EmeraldDshackle", Version(1, 0, 0, null, null, null))
module.addSerializer(JsonRpcResponse::class.java, JsonRpcResponse.ResponseJsonSerializer())
module.addSerializer(TransactionId::class.java, TransactionIdSerializer())

module.addDeserializer(EsploraUnspent::class.java, EsploraUnspentDeserializer())
module.addDeserializer(RpcUnspent::class.java, RpcUnspentDeserializer())
Expand Down
101 changes: 101 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/commons/DurableFlux.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package io.emeraldpay.dshackle.commons

import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.util.backoff.BackOff
import org.springframework.util.backoff.BackOffExecution
import org.springframework.util.backoff.ExponentialBackOff
import org.springframework.util.backoff.FixedBackOff
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration

/**
* A flux holder that reconnects to it on failure taking into account a back off strategy
*/
class DurableFlux<T>(
private val provider: () -> Flux<T>,
private val errorBackOff: BackOff,
private val log: Logger,
) {

companion object {
private val defaultLog = LoggerFactory.getLogger(DurableFlux::class.java)

@JvmStatic
fun newBuilder(): Builder<*> {
return Builder<Any>()
}
}

private var messagesSinceStart = 0
private var errorBackOffExecution = errorBackOff.start()

fun connect(): Flux<T> {
return provider.invoke()
.doOnNext {
if (messagesSinceStart == 0) {
errorBackOffExecution = errorBackOff.start()
}
messagesSinceStart++
}
.doOnSubscribe {
messagesSinceStart = 0
}
.onErrorResume { t ->
val backoff = errorBackOffExecution.nextBackOff()
if (backoff != BackOffExecution.STOP) {
log.warn("Connection closed with ${t.message}. Reconnecting in ${backoff}ms")
connect().delaySubscription(Duration.ofMillis(backoff))
} else {
log.warn("Connection closed with ${t.message}. Not reconnecting")
Mono.error(t)
}
}
}

class Builder<T> {

private var provider: (() -> Flux<T>)? = null

protected var errorBackOff: BackOff = FixedBackOff(1_000, Long.MAX_VALUE)
protected var log: Logger = DurableFlux.defaultLog

@Suppress("UNCHECKED_CAST")
fun <X> using(provider: () -> Flux<X>): Builder<X> {
this.provider = provider as () -> Flux<T>
return this as Builder<X>
}

fun backoffOnError(time: Duration): Builder<T> {
errorBackOff = FixedBackOff(time.toMillis(), Long.MAX_VALUE)
return this
}

fun backoffOnError(time: Duration, multiplier: Double, max: Duration? = null): Builder<T> {
errorBackOff = ExponentialBackOff(time.toMillis(), multiplier).also {
if (max != null) {
it.maxInterval = max.toMillis()
}
}
return this
}

fun backoffOnError(backOff: BackOff): Builder<T> {
errorBackOff = backOff
return this
}

fun logTo(log: Logger): Builder<T> {
this.log = log
return this
}

fun build(): DurableFlux<T> {
if (provider == null) {
throw IllegalStateException("No provider for original Flux")
}
return DurableFlux(provider!!, errorBackOff, log)
}
}
}
152 changes: 152 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/commons/ExpiringSet.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package io.emeraldpay.dshackle.commons

import org.apache.commons.collections4.iterators.UnmodifiableIterator
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.LinkedList
import java.util.TreeSet
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

/**
* A naive implementation of a Set with a limit for elements and an expiration time. Supposed to be used a filter for uniqueness.
* Internally it uses a TreeSet and a journal of added elements, which is used ot remove elements when they expire or the list grows too large.
* It's tread safe, but may be suboptimal to use in multithreaded scenario because of internal locks.
*/
class ExpiringSet<T>(
ttl: Duration,
comparator: Comparator<T>,
val limit: Int,
) : MutableSet<T> {

companion object {
private val log = LoggerFactory.getLogger(ExpiringSet::class.java)
}

private val tree = TreeSet<T>(comparator)
private val lock = ReentrantLock()
private val journal = LinkedList<JournalItem<T>>()
private var count = 0

private val ttl = ttl.toMillis()

data class JournalItem<T>(
val since: Long = System.currentTimeMillis(),
val value: T
) {
fun isExpired(ttl: Long): Boolean {
return System.currentTimeMillis() > since + ttl
}
}

override val size: Int
get() = count

override fun clear() {
lock.withLock {
tree.clear()
journal.clear()
count = 0
}
}

override fun addAll(elements: Collection<T>): Boolean {
var changed = false
elements.forEach {
changed = changed || add(it)
}
return changed
}

override fun add(element: T): Boolean {
lock.withLock {
val added = tree.add(element)
if (added) {
journal.offer(JournalItem(value = element))
count++
shrink()
}
return added
}
}

override fun isEmpty(): Boolean {
return count == 0
}

override fun iterator(): MutableIterator<T> {
// not mutable
return UnmodifiableIterator.unmodifiableIterator(tree.iterator())
}

override fun retainAll(elements: Collection<T>): Boolean {
lock.withLock {
var changed = false
val iter = tree.iterator()
while (iter.hasNext()) {
val next = iter.next()
if (!elements.contains(next)) {
changed = true
iter.remove()
count--
}
}
return changed
}
}

override fun removeAll(elements: Collection<T>): Boolean {
var changed = false
elements.forEach {
changed = changed || remove(it)
}
return changed
}

override fun remove(element: T): Boolean {
lock.withLock {
val changed = tree.remove(element)
if (changed) {
count--
}
return changed
}
}

override fun containsAll(elements: Collection<T>): Boolean {
return elements.all { contains(it) }
}

override fun contains(element: T): Boolean {
lock.withLock {
return tree.contains(element)
}
}

fun shrink() {
lock.withLock {
val iter = journal.iterator()
val removeAtLeast = (count - limit).coerceAtLeast(0)
var removed = 0
var stop = false
while (!stop && iter.hasNext()) {
val next = iter.next()
val overflow = removeAtLeast > removed
val expired = next.isExpired(ttl)
if (overflow || expired) {
iter.remove()
if (tree.remove(next.value)) {
removed++
}
}
// we always delete expired elements so don't stop on that
if (!expired) {
// but if we already deleted all non-expired element (i.e., started because it grew too large)
// then we stop as soon as we don't have any overflow
stop = !overflow
}
}
count -= removed
}
}
}
67 changes: 67 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/commons/SharedFluxHolder.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.emeraldpay.dshackle.commons

import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.write

/**
* A flux holder that that creates it only if requested. Keeps it for the following calls, so all the following calls will
* reuse it. Forgets as soon as it completes/cancelled, so it will be recreated again if needed.
*/
class SharedFluxHolder<T>(
/**
* Provider for the flux. Note that it can be called multiple times but only one is used at the same time.
* I.e., if there is a few calls because of a thread-race only one is kept.
* But once it's completed a new one may be created if requested.
*/
private val provider: () -> Flux<T>
) {

companion object {
private val log = LoggerFactory.getLogger(SharedFluxHolder::class.java)
}

private val ids = AtomicLong()
private val lock = ReentrantReadWriteLock()
private var current: Holder<T>? = null

fun get(): Flux<T> {
lock.read {
if (current != null) {
return current!!.flux
}
}
// The following doesn't consume resources because it's just create a Flux without actual subscription
// So even for the case of a thread race it's okay to create many. B/c only one is going to be kept as `current` and subscribed
val id = ids.incrementAndGet()
val created = Holder(
provider.invoke()
.share()
.doFinally { onClose(id) },
id
)
lock.write {
if (current != null) {
return current!!.flux
}
current = created
}
return created.flux
}

private fun onClose(id: Long) {
lock.write {
if (current?.id == id) {
current = null
}
}
}

data class Holder<T>(
val flux: Flux<T>,
val id: Long,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ class WebsocketHandler(
): Flux<String> {
return requests.flatMap { call ->
val method = call.method

if (method == "eth_subscribe") {
val methodParams = splitMethodParams(call.params)
if (methodParams != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ open class NativeSubscribe(
open fun subscribe(chain: Chain, method: String, params: Any?): Flux<out Any> {
val up = multistreamHolder.getUpstream(chain) ?: return Flux.error(SilentException.UnsupportedBlockchain(chain))
return (up as EthereumMultistream)
.getSubscribe()
.getSubscribtionApi()
.subscribe(method, params)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ class TrackERC20Address(
val asset = request.asset.code.lowercase(Locale.getDefault())
val tokenDefinition = tokens[TokenId(chain, asset)] ?: return Flux.empty()
val logs = getUpstream(chain)
.getSubscribe().logs
.start(
.getSubscribtionApi().logs
.create(
listOf(tokenDefinition.token.contract),
listOf(EventId.fromSignature("Transfer", "address", "address", "uint256"))
)
.connect()

return ethereumAddresses.extract(request.address)
.map { TrackedAddress(chain, it, tokenDefinition.token, tokenDefinition.name) }
Expand Down
Loading