Skip to content

Commit

Permalink
Merge branch 'v5.0.23' of github.com:ergoplatform/ergo into v5.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
kushti committed Oct 2, 2024
2 parents 48827a3 + 003bb84 commit 2d2c091
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ object HeaderSerializer extends ErgoSerializer[Header] {
w.putBytes(h.votes)

// For block version >= 2, this new byte encodes length of possible new fields.
// Set to 0 for now, so no new fields.
// starting from 5.0.23, new fields also included.
// They should be added in >= 5 bock version, see serializer
if (h.version > Header.InitialVersion) {
w.putUByte(h.unparsedBytes.length)
w.putBytes(h.unparsedBytes)
Expand Down
37 changes: 37 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -590,4 +590,41 @@ critical-dispatcher {
fixed-pool-size = 2
}
throughput = 1
}

# dispatcher for some API-related actors
api-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 2
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 4
}

indexer-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 1.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 4
}
}
16 changes: 16 additions & 0 deletions src/main/scala/org/ergoplatform/GlobalConstants.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.ergoplatform

/**
* A singleton which holds constants needed around the whole Ergo Platform.
*/
object GlobalConstants {

/**
* Name of dispatcher for actors processing API requests
* (to avoid clashing between blockchain processing and API actors)
*/
val ApiDispatcher = "api-dispatcher"

val IndexerDispatcher = "indexer-dispatcher"

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.ergoplatform.nodeView

import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import org.ergoplatform.GlobalConstants
import org.ergoplatform.nodeView.ErgoReadersHolder._
import org.ergoplatform.nodeView.history.ErgoHistoryReader
import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader
Expand Down Expand Up @@ -68,7 +69,7 @@ object ErgoReadersHolderRef {

def apply(viewHolderRef: ActorRef)
(implicit context: ActorRefFactory): ActorRef = {
val props = Props(new ErgoReadersHolder(viewHolderRef))
val props = Props(new ErgoReadersHolder(viewHolderRef)).withDispatcher(GlobalConstants.ApiDispatcher)
context.actorOf(props)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.ergoplatform.nodeView.history.extra

import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash}
import org.ergoplatform.ErgoBox.TokenId
import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder, Pay2SAddress}
import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder, GlobalConstants, Pay2SAddress}
import org.ergoplatform.modifiers.history.BlockTransactions
import org.ergoplatform.modifiers.history.header.Header
import org.ergoplatform.modifiers.mempool.ErgoTransaction
Expand All @@ -25,14 +25,16 @@ import spire.syntax.all.cfor
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.collection.concurrent
import scala.concurrent.Future
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.jdk.CollectionConverters._

/**
* Base trait for extra indexer actor and its test.
*/
trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {

private implicit val ec: ExecutionContextExecutor = context.dispatcher

/**
* Max buffer size (determined by config)
*/
Expand Down Expand Up @@ -89,14 +91,24 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
if (height % 1000 == 0) blockCache.keySet.filter(_ < height).map(blockCache.remove)
if (readingUpTo - height < 300 && chainHeight - height > 1000) {
readingUpTo = math.min(height + 1001, chainHeight)
val blockNums = height + 1 to readingUpTo by 50
Future {
blockNums.zip(blockNums.tail).map { range => // ranges of 50 blocks for each thread to read
(range._1 until range._2).foreach { blockNum =>

if(height < history.fullBlockHeight - 1000) {
val blockNums = height + 1 to readingUpTo by 250
blockNums.zip(blockNums.tail).map { range => // ranges of 250 blocks for each thread to read
Future {
(range._1 until range._2).foreach { blockNum =>
history.bestBlockTransactionsAt(blockNum).map(blockCache.put(blockNum, _))
}
}
}
} else {
val blockNums = height + 1 to readingUpTo
Future {
blockNums.foreach { blockNum =>
history.bestBlockTransactionsAt(blockNum).map(blockCache.put(blockNum, _))
}
}
}(context.dispatcher)
}
}
txs
}
Expand Down Expand Up @@ -238,7 +250,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
val height = headerOpt.map(_.height).getOrElse(state.indexedHeight)

if (btOpt.isEmpty) {
log.warn(s"Could not read block $height / $chainHeight from database, waiting for new block until retrying")
log.error(s"Could not read block $height / $chainHeight from database, waiting for new block until retrying")
return state.decrementIndexedHeight.copy(caughtUp = true)
}

Expand Down Expand Up @@ -584,6 +596,6 @@ object ExtraIndexer {

def apply(chainSettings: ChainSettings, cacheSettings: CacheSettings)(implicit system: ActorSystem): ActorRef = {
val props = Props.create(classOf[ExtraIndexer], cacheSettings, chainSettings.addressEncoder)
system.actorOf(props)
system.actorOf(props.withDispatcher(GlobalConstants.IndexerDispatcher))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ object ErgoWalletActor extends ScorexLogging {
boxSelector: BoxSelector,
historyReader: ErgoHistoryReader)(implicit actorSystem: ActorSystem): ActorRef = {
val props = Props(classOf[ErgoWalletActor], settings, parameters, service, boxSelector, historyReader)
.withDispatcher(GlobalConstants.ApiDispatcher)
val walletActorRef = actorSystem.actorOf(props)
CoordinatedShutdown(actorSystem).addActorTerminationTask(
CoordinatedShutdown.PhaseBeforeServiceUnbind,
Expand Down

0 comments on commit 2d2c091

Please sign in to comment.