From bdca297cf2869c3231ef24f28cd614861a4d41c1 Mon Sep 17 00:00:00 2001 From: Haemin Yoo Date: Thu, 17 Oct 2024 11:53:34 +0900 Subject: [PATCH] Expose current pod assignments (#145) * Expose current pod assignments * Expose new public operations in the companion object * Fix compilation for Scala 2.12 --- .../com/devsisters/shardcake/Sharding.scala | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala b/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala index 5d9565d..b13d052 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala @@ -57,7 +57,7 @@ class Sharding private ( shardManager.unregister(address).catchAllCause(ZIO.logErrorCause("Error during unregister", _)) ) - private def isSingletonNode: UIO[Boolean] = + val isSingletonNode: UIO[Boolean] = // Start singletons on the pod hosting shard 1. shardAssignments.get.map(_.get(1).contains(address)) @@ -127,7 +127,15 @@ class Sharding private ( pod = shards.get(shardId) } yield pod.contains(address) - def getPods: UIO[Set[PodAddress]] = + val getAssignments: UIO[Map[ShardId, PodAddress]] = + shardAssignments.get + + val thisPodAssignments: UIO[Chunk[ShardId]] = + getAssignments.map(a => + Chunk.fromIterable(a.view.collect { case (shardId, addr) if addr == this.address => shardId }) + ) + + val getPods: UIO[Set[PodAddress]] = shardAssignments.get.map(_.values.toSet) private def updateAssignments( @@ -622,6 +630,12 @@ object Sharding { def registerScoped: RIO[Sharding with Scope, Unit] = Sharding.register.withFinalizer(_ => Sharding.unregister) + /** + * Returns true if current node contains the singletons + */ + def isSingletonNode: RIO[Sharding, Boolean] = + ZIO.serviceWithZIO[Sharding](_.isSingletonNode) + /** * Start a computation that is guaranteed to run only on a single pod. * Each pod should call `registerSingleton` but only a single pod will actually run it at any given time. @@ -676,6 +690,20 @@ object Sharding { ): URIO[Sharding, Broadcaster[Msg]] = ZIO.serviceWith[Sharding](_.broadcaster(topicType, sendTimeout)) + /** + * Get the list of shards and the pod that holds them. + * + * Note: ShardId may not show up if the shard is not assigned to any pod. + */ + def getAssignments: RIO[Sharding, Map[ShardId, PodAddress]] = + ZIO.serviceWithZIO[Sharding](_.getAssignments) + + /** + * Get the list of shards currently assigned to the current pod + */ + def thisPodAssignments: RIO[Sharding, Chunk[ShardId]] = + ZIO.serviceWithZIO[Sharding](_.thisPodAssignments) + /** * Get the list of pods currently registered to the Shard Manager */