Skip to content

Commit

Permalink
getting dispatchers back
Browse files Browse the repository at this point in the history
  • Loading branch information
kushti committed Sep 30, 2024
1 parent 756a8bb commit 003bb84
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 7 deletions.
37 changes: 37 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -590,4 +590,41 @@ critical-dispatcher {
fixed-pool-size = 2
}
throughput = 1
}

# dispatcher for some API-related actors
api-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 2
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 4
}

indexer-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 1.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 4
}
}
16 changes: 16 additions & 0 deletions src/main/scala/org/ergoplatform/GlobalConstants.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.ergoplatform

/**
* A singleton which holds constants needed around the whole Ergo Platform.
*/
object GlobalConstants {

/**
* Name of dispatcher for actors processing API requests
* (to avoid clashing between blockchain processing and API actors)
*/
val ApiDispatcher = "api-dispatcher"

val IndexerDispatcher = "indexer-dispatcher"

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.ergoplatform.nodeView

import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import org.ergoplatform.GlobalConstants
import org.ergoplatform.nodeView.ErgoReadersHolder._
import org.ergoplatform.nodeView.history.ErgoHistoryReader
import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader
Expand Down Expand Up @@ -68,7 +69,7 @@ object ErgoReadersHolderRef {

def apply(viewHolderRef: ActorRef)
(implicit context: ActorRefFactory): ActorRef = {
val props = Props(new ErgoReadersHolder(viewHolderRef))
val props = Props(new ErgoReadersHolder(viewHolderRef)).withDispatcher(GlobalConstants.ApiDispatcher)
context.actorOf(props)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.ergoplatform.nodeView.history.extra

import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash}
import org.ergoplatform.ErgoBox.TokenId
import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder, Pay2SAddress}
import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder, GlobalConstants, Pay2SAddress}
import org.ergoplatform.modifiers.history.BlockTransactions
import org.ergoplatform.modifiers.history.header.Header
import org.ergoplatform.modifiers.mempool.ErgoTransaction
Expand All @@ -25,14 +25,16 @@ import spire.syntax.all.cfor
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.collection.concurrent
import scala.concurrent.Future
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.jdk.CollectionConverters._

/**
* Base trait for extra indexer actor and its test.
*/
trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {

private implicit val ec: ExecutionContextExecutor = context.dispatcher

/**
* Max buffer size (determined by config)
*/
Expand Down Expand Up @@ -97,15 +99,15 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
(range._1 until range._2).foreach { blockNum =>
history.bestBlockTransactionsAt(blockNum).map(blockCache.put(blockNum, _))
}
}(context.dispatcher)
}
}
} else {
val blockNums = height + 1 to readingUpTo
Future {
blockNums.foreach { blockNum =>
history.bestBlockTransactionsAt(blockNum).map(blockCache.put(blockNum, _))
}
}(context.dispatcher)
}
}
}
txs
Expand Down Expand Up @@ -248,7 +250,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
val height = headerOpt.map(_.height).getOrElse(state.indexedHeight)

if (btOpt.isEmpty) {
log.warn(s"Could not read block $height / $chainHeight from database, waiting for new block until retrying")
log.error(s"Could not read block $height / $chainHeight from database, waiting for new block until retrying")
return state.decrementIndexedHeight.copy(caughtUp = true)
}

Expand Down Expand Up @@ -594,6 +596,6 @@ object ExtraIndexer {

def apply(chainSettings: ChainSettings, cacheSettings: CacheSettings)(implicit system: ActorSystem): ActorRef = {
val props = Props.create(classOf[ExtraIndexer], cacheSettings, chainSettings.addressEncoder)
system.actorOf(props)
system.actorOf(props.withDispatcher(GlobalConstants.IndexerDispatcher))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ object ErgoWalletActor extends ScorexLogging {
boxSelector: BoxSelector,
historyReader: ErgoHistoryReader)(implicit actorSystem: ActorSystem): ActorRef = {
val props = Props(classOf[ErgoWalletActor], settings, parameters, service, boxSelector, historyReader)
.withDispatcher(GlobalConstants.ApiDispatcher)
val walletActorRef = actorSystem.actorOf(props)
CoordinatedShutdown(actorSystem).addActorTerminationTask(
CoordinatedShutdown.PhaseBeforeServiceUnbind,
Expand Down

0 comments on commit 003bb84

Please sign in to comment.