Skip to content

Commit

Permalink
Change behavior of simulateRemotePods to use the Pods API (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
ghostdogpr authored Sep 19, 2024
1 parent ac1b90b commit 73c4c5e
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 23 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ val sttpVersion = "3.9.6"
val calibanVersion = "2.8.1"
val redis4catsVersion = "1.5.2"
val redissonVersion = "3.27.1"
val scalaKryoVersion = "1.0.2"
val scalaKryoVersion = "1.2.0"
val testContainersVersion = "0.41.3"
val scalaCompatVersion = "2.12.0"

Expand Down Expand Up @@ -212,6 +212,13 @@ lazy val commonSettings = Def.settings(
"com.dimafeng" %% "testcontainers-scala-core" % testContainersVersion % Test
),
Test / fork := true,
Test / javaOptions ++= Seq(
// Kryo requires this with the recent versions of Java
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.sql/java.sql=ALL-UNNAMED"
),
scalacOptions ++= Seq(
"-deprecation",
"-encoding",
Expand Down
26 changes: 10 additions & 16 deletions entities/src/main/scala/com/devsisters/shardcake/Sharding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,21 +257,15 @@ class Sharding private (
replyId: Option[String],
replyChannel: ReplyChannel[Res]
): Task[Unit] =
if (config.simulateRemotePods) {
serialization
.encode(msg)
.flatMap(bytes => sendToLocalEntity(BinaryMessage(entityId, recipientTypeName, bytes, replyId), replyChannel))
} else {
// if pod = self, shortcut and send directly without serialization
entityStates.get.flatMap(
_.get(recipientTypeName) match {
case Some(state) =>
state.entityManager.asInstanceOf[EntityManager[Msg]].send(entityId, msg, replyId, replyChannel)
case None =>
ZIO.fail(new Exception(s"Entity type $recipientTypeName was not registered."))
}
)
}
// if pod = self, shortcut and send directly without serialization
entityStates.get.flatMap(
_.get(recipientTypeName) match {
case Some(state) =>
state.entityManager.asInstanceOf[EntityManager[Msg]].send(entityId, msg, replyId, replyChannel)
case None =>
ZIO.fail(new Exception(s"Entity type $recipientTypeName was not registered."))
}
)

private def sendToPod[Msg, Res](
recipientTypeName: String,
Expand All @@ -281,7 +275,7 @@ class Sharding private (
replyChannel: ReplyChannel[Res],
replyId: Option[String]
): Task[Unit] =
if (pod == address) {
if (pod == address && !config.simulateRemotePods) {
val run = sendChannel.foreach(sendToSelf(recipientTypeName, entityId, _, replyId, replyChannel))
sendChannel match {
case _: SendChannel.Single[_] => run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ import scala.util.Success

object BroadcastingSpec extends ZIOSpecDefault {

private val config = ZLayer.succeed(
Config.default.copy(
simulateRemotePods = true
)
)
private val config = ZLayer.succeed(Config.default)

def spec: Spec[TestEnvironment with Scope, Any] =
suite("BroadcastingSpec")(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ object StorageRedis {

def savePods(pods: Map[PodAddress, Pod]): Task[Unit] =
stringClient.del(config.podsKey) *>
stringClient.hSet(config.podsKey, pods.map { case (k, v) => k.toString -> v.version }).unit
stringClient
.hSet(config.podsKey, pods.map { case (k, v) => k.toString -> v.version })
.when(pods.nonEmpty)
.unit
}
}
}

0 comments on commit 73c4c5e

Please sign in to comment.