From 73c4c5e70cc31e56705cd64336a668ca9cb82a76 Mon Sep 17 00:00:00 2001 From: Pierre Ricadat Date: Thu, 19 Sep 2024 12:12:08 +0900 Subject: [PATCH] Change behavior of simulateRemotePods to use the Pods API (#142) --- build.sbt | 9 ++++++- .../com/devsisters/shardcake/Sharding.scala | 26 +++++++------------ .../shardcake/BroadcastingSpec.scala | 6 +---- .../devsisters/shardcake/StorageRedis.scala | 5 +++- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/build.sbt b/build.sbt index 8dd7a1e..2802809 100644 --- a/build.sbt +++ b/build.sbt @@ -13,7 +13,7 @@ val sttpVersion = "3.9.6" val calibanVersion = "2.8.1" val redis4catsVersion = "1.5.2" val redissonVersion = "3.27.1" -val scalaKryoVersion = "1.0.2" +val scalaKryoVersion = "1.2.0" val testContainersVersion = "0.41.3" val scalaCompatVersion = "2.12.0" @@ -212,6 +212,13 @@ lazy val commonSettings = Def.settings( "com.dimafeng" %% "testcontainers-scala-core" % testContainersVersion % Test ), Test / fork := true, + Test / javaOptions ++= Seq( + // Kryo requires this with the recent versions of Java + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.sql/java.sql=ALL-UNNAMED" + ), scalacOptions ++= Seq( "-deprecation", "-encoding", diff --git a/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala b/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala index ee50435..a9c8caa 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala @@ -257,21 +257,15 @@ class Sharding private ( replyId: Option[String], replyChannel: ReplyChannel[Res] ): Task[Unit] = - if (config.simulateRemotePods) { - serialization - .encode(msg) - .flatMap(bytes => sendToLocalEntity(BinaryMessage(entityId, recipientTypeName, bytes, replyId), replyChannel)) - } else { - // if pod = self, shortcut and send directly without serialization - entityStates.get.flatMap( - _.get(recipientTypeName) match { - case Some(state) => - state.entityManager.asInstanceOf[EntityManager[Msg]].send(entityId, msg, replyId, replyChannel) - case None => - ZIO.fail(new Exception(s"Entity type $recipientTypeName was not registered.")) - } - ) - } + // if pod = self, shortcut and send directly without serialization + entityStates.get.flatMap( + _.get(recipientTypeName) match { + case Some(state) => + state.entityManager.asInstanceOf[EntityManager[Msg]].send(entityId, msg, replyId, replyChannel) + case None => + ZIO.fail(new Exception(s"Entity type $recipientTypeName was not registered.")) + } + ) private def sendToPod[Msg, Res]( recipientTypeName: String, @@ -281,7 +275,7 @@ class Sharding private ( replyChannel: ReplyChannel[Res], replyId: Option[String] ): Task[Unit] = - if (pod == address) { + if (pod == address && !config.simulateRemotePods) { val run = sendChannel.foreach(sendToSelf(recipientTypeName, entityId, _, replyId, replyChannel)) sendChannel match { case _: SendChannel.Single[_] => run diff --git a/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala b/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala index 5e9d5eb..f46032f 100644 --- a/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala +++ b/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala @@ -9,11 +9,7 @@ import scala.util.Success object BroadcastingSpec extends ZIOSpecDefault { - private val config = ZLayer.succeed( - Config.default.copy( - simulateRemotePods = true - ) - ) + private val config = ZLayer.succeed(Config.default) def spec: Spec[TestEnvironment with Scope, Any] = suite("BroadcastingSpec")( diff --git a/storage-redis/src/main/scala/com/devsisters/shardcake/StorageRedis.scala b/storage-redis/src/main/scala/com/devsisters/shardcake/StorageRedis.scala index cda21f2..39536bf 100644 --- a/storage-redis/src/main/scala/com/devsisters/shardcake/StorageRedis.scala +++ b/storage-redis/src/main/scala/com/devsisters/shardcake/StorageRedis.scala @@ -52,7 +52,10 @@ object StorageRedis { def savePods(pods: Map[PodAddress, Pod]): Task[Unit] = stringClient.del(config.podsKey) *> - stringClient.hSet(config.podsKey, pods.map { case (k, v) => k.toString -> v.version }).unit + stringClient + .hSet(config.podsKey, pods.map { case (k, v) => k.toString -> v.version }) + .when(pods.nonEmpty) + .unit } } }