Skip to content

Commit

Permalink
Keep the shard ids sorted for logging and api (#147)
Browse files Browse the repository at this point in the history
* Use SortedMap for in-memory shard state

* Use BitSet for assignments and unassignments

* Specify SortedMap for shards

* Revert and sort on log

* Deduplicate sorting of shards, use implementation that works on 2.12

* Format GraphQLApi file
  • Loading branch information
yoohaemin authored Oct 17, 2024
1 parent ac770c8 commit 4a19c36
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 6 deletions.
8 changes: 8 additions & 0 deletions core/src/main/scala/com/devsisters/shardcake/package.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package com.devsisters

import scala.collection.mutable

package object shardcake {
type ShardId = Int
type EpochMillis = Long

private[shardcake] def renderShardIds(ids: Iterable[ShardId]): String =
ids
.foldLeft(mutable.BitSet.empty)(_ += _)
.mkString("[", ", ", "]")

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,23 +102,23 @@ class Sharding private (
shardAssignments.update(shards.foldLeft(_) { case (map, shard) => map.updated(shard, address) }) *>
Metrics.shards.incrementBy(shards.size) *>
startSingletonsIfNeeded *>
ZIO.logDebug(s"Assigned shards: $shards")
ZIO.logDebug(s"Assigned shards: ${renderShardIds(shards)}")
}
.unit

private[shardcake] def unassign(shards: Set[ShardId]): UIO[Unit] =
shardAssignments.update(shards.foldLeft(_) { case (map, shard) =>
if (map.get(shard).contains(address)) map - shard else map
}) *>
ZIO.logDebug(s"Unassigning shards: $shards") *>
ZIO.logDebug(s"Unassigning shards: ${renderShardIds(shards)}") *>
entityStates.get.flatMap(state =>
ZIO.foreachDiscard(state.values)(
_.entityManager.terminateEntitiesOnShards(shards) // this will return once all shards are terminated
)
) *>
Metrics.shards.decrementBy(shards.size) *>
stopSingletonsIfNeeded *>
ZIO.logDebug(s"Unassigned shards: $shards")
ZIO.logDebug(s"Unassigned shards: ${renderShardIds(shards)}")

private[shardcake] def isEntityOnLocalShards(recipientType: RecipientType[_], entityId: String): UIO[Boolean] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ object GraphQLApi extends GenericSchema[ShardManager] {
val api: GraphQL[ShardManager] =
graphQL[ShardManager, Queries, Mutations, Subscriptions](
RootResolver(
Queries(ZIO.serviceWithZIO(_.getAssignments.map(_.map { case (k, v) => Assignment(k, v) }.toList))),
Queries(
ZIO.serviceWithZIO(
_.getAssignments.map(_.map { case (k, v) => Assignment(k, v) }.toList.sortBy(_.shardId))
)
),
Mutations(
pod => ZIO.serviceWithZIO(_.register(pod)),
pod => ZIO.serviceWithZIO(_.unregister(pod.address)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,12 @@ object ShardManager {

sealed trait ShardingEvent
object ShardingEvent {
case class ShardsAssigned(pod: PodAddress, shards: Set[ShardId]) extends ShardingEvent
case class ShardsUnassigned(pod: PodAddress, shards: Set[ShardId]) extends ShardingEvent
case class ShardsAssigned(pod: PodAddress, shards: Set[ShardId]) extends ShardingEvent {
override def toString: String = s"ShardsAssigned(pod=$pod, shards=${renderShardIds(shards)})"
}
case class ShardsUnassigned(pod: PodAddress, shards: Set[ShardId]) extends ShardingEvent {
override def toString: String = s"ShardsUnassigned(pod=$pod, shards=${renderShardIds(shards)})"
}
case class PodRegistered(pod: PodAddress) extends ShardingEvent
case class PodUnregistered(pod: PodAddress) extends ShardingEvent
case class PodHealthChecked(pod: PodAddress) extends ShardingEvent
Expand Down

0 comments on commit 4a19c36

Please sign in to comment.