diff --git a/ergo-core/src/main/scala/org/ergoplatform/modifiers/history/header/HeaderSerializer.scala b/ergo-core/src/main/scala/org/ergoplatform/modifiers/history/header/HeaderSerializer.scala index d6d72f1aa4..c7688bac61 100644 --- a/ergo-core/src/main/scala/org/ergoplatform/modifiers/history/header/HeaderSerializer.scala +++ b/ergo-core/src/main/scala/org/ergoplatform/modifiers/history/header/HeaderSerializer.scala @@ -30,7 +30,8 @@ object HeaderSerializer extends ErgoSerializer[Header] { w.putBytes(h.votes) // For block version >= 2, this new byte encodes length of possible new fields. - // Set to 0 for now, so no new fields. + // starting from 5.0.23, new fields also included. + // They should be added in >= 5 bock version, see serializer if (h.version > Header.InitialVersion) { w.putUByte(h.unparsedBytes.length) w.putBytes(h.unparsedBytes) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 0b3a7c4618..108b3087c1 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -590,4 +590,41 @@ critical-dispatcher { fixed-pool-size = 2 } throughput = 1 +} + +# dispatcher for some API-related actors +api-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 1 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 2 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 4 +} + +indexer-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 1 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 1.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 4 + } } \ No newline at end of file diff --git a/src/main/scala/org/ergoplatform/GlobalConstants.scala b/src/main/scala/org/ergoplatform/GlobalConstants.scala new file mode 100644 index 0000000000..2536689a4f --- /dev/null +++ b/src/main/scala/org/ergoplatform/GlobalConstants.scala @@ -0,0 +1,16 @@ +package org.ergoplatform + +/** + * A singleton which holds constants needed around the whole Ergo Platform. + */ +object GlobalConstants { + + /** + * Name of dispatcher for actors processing API requests + * (to avoid clashing between blockchain processing and API actors) + */ + val ApiDispatcher = "api-dispatcher" + + val IndexerDispatcher = "indexer-dispatcher" + +} diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoReadersHolder.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoReadersHolder.scala index 8ff1f1f710..a7df32ee3f 100644 --- a/src/main/scala/org/ergoplatform/nodeView/ErgoReadersHolder.scala +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoReadersHolder.scala @@ -1,6 +1,7 @@ package org.ergoplatform.nodeView import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} +import org.ergoplatform.GlobalConstants import org.ergoplatform.nodeView.ErgoReadersHolder._ import org.ergoplatform.nodeView.history.ErgoHistoryReader import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader @@ -68,7 +69,7 @@ object ErgoReadersHolderRef { def apply(viewHolderRef: ActorRef) (implicit context: ActorRefFactory): ActorRef = { - val props = Props(new ErgoReadersHolder(viewHolderRef)) + val props = Props(new ErgoReadersHolder(viewHolderRef)).withDispatcher(GlobalConstants.ApiDispatcher) context.actorOf(props) } diff --git a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala index 1a81fb3f08..29636c920a 100644 --- a/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala +++ b/src/main/scala/org/ergoplatform/nodeView/history/extra/ExtraIndexer.scala @@ -2,7 +2,7 @@ package org.ergoplatform.nodeView.history.extra import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash} import org.ergoplatform.ErgoBox.TokenId -import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder, Pay2SAddress} +import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder, GlobalConstants, Pay2SAddress} import org.ergoplatform.modifiers.history.BlockTransactions import org.ergoplatform.modifiers.history.header.Header import org.ergoplatform.modifiers.mempool.ErgoTransaction @@ -25,7 +25,7 @@ import spire.syntax.all.cfor import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.collection.concurrent -import scala.concurrent.Future +import scala.concurrent.{ExecutionContextExecutor, Future} import scala.jdk.CollectionConverters._ /** @@ -33,6 +33,8 @@ import scala.jdk.CollectionConverters._ */ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { + private implicit val ec: ExecutionContextExecutor = context.dispatcher + /** * Max buffer size (determined by config) */ @@ -89,14 +91,24 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { if (height % 1000 == 0) blockCache.keySet.filter(_ < height).map(blockCache.remove) if (readingUpTo - height < 300 && chainHeight - height > 1000) { readingUpTo = math.min(height + 1001, chainHeight) - val blockNums = height + 1 to readingUpTo by 50 - Future { - blockNums.zip(blockNums.tail).map { range => // ranges of 50 blocks for each thread to read - (range._1 until range._2).foreach { blockNum => + + if(height < history.fullBlockHeight - 1000) { + val blockNums = height + 1 to readingUpTo by 250 + blockNums.zip(blockNums.tail).map { range => // ranges of 250 blocks for each thread to read + Future { + (range._1 until range._2).foreach { blockNum => + history.bestBlockTransactionsAt(blockNum).map(blockCache.put(blockNum, _)) + } + } + } + } else { + val blockNums = height + 1 to readingUpTo + Future { + blockNums.foreach { blockNum => history.bestBlockTransactionsAt(blockNum).map(blockCache.put(blockNum, _)) } } - }(context.dispatcher) + } } txs } @@ -238,7 +250,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging { val height = headerOpt.map(_.height).getOrElse(state.indexedHeight) if (btOpt.isEmpty) { - log.warn(s"Could not read block $height / $chainHeight from database, waiting for new block until retrying") + log.error(s"Could not read block $height / $chainHeight from database, waiting for new block until retrying") return state.decrementIndexedHeight.copy(caughtUp = true) } @@ -584,6 +596,6 @@ object ExtraIndexer { def apply(chainSettings: ChainSettings, cacheSettings: CacheSettings)(implicit system: ActorSystem): ActorRef = { val props = Props.create(classOf[ExtraIndexer], cacheSettings, chainSettings.addressEncoder) - system.actorOf(props) + system.actorOf(props.withDispatcher(GlobalConstants.IndexerDispatcher)) } } diff --git a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala index 41e48f9fb8..b64b43522f 100644 --- a/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala +++ b/src/main/scala/org/ergoplatform/nodeView/wallet/ErgoWalletActor.scala @@ -509,6 +509,7 @@ object ErgoWalletActor extends ScorexLogging { boxSelector: BoxSelector, historyReader: ErgoHistoryReader)(implicit actorSystem: ActorSystem): ActorRef = { val props = Props(classOf[ErgoWalletActor], settings, parameters, service, boxSelector, historyReader) + .withDispatcher(GlobalConstants.ApiDispatcher) val walletActorRef = actorSystem.actorOf(props) CoordinatedShutdown(actorSystem).addActorTerminationTask( CoordinatedShutdown.PhaseBeforeServiceUnbind,