Skip to content

Commit

Permalink
Wait until we get initial assignments during startup (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
ghostdogpr authored Nov 20, 2023
1 parent 9094a08 commit 426e209
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions entities/src/main/scala/com/devsisters/shardcake/Sharding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,24 @@ class Sharding private (
))
}

private[shardcake] val refreshAssignments: ZIO[Scope, Nothing, Unit] = {
val assignmentStream =
ZStream.fromZIO(
shardManager.getAssignments.map(_ -> true) // first, get the assignments from the shard manager directly
) ++
storage.assignmentsStream.map(_ -> false) // then, get assignments changes from Redis
assignmentStream.mapZIO { case (assignmentsOpt, fromShardManager) =>
updateAssignments(assignmentsOpt, fromShardManager)
}.runDrain
}.retry(Schedule.fixed(config.refreshAssignmentsRetryInterval))
.interruptible
.forkDaemon
.withFinalizer(_.interrupt)
.unit
private[shardcake] val refreshAssignments: ZIO[Scope, Nothing, Unit] =
for {
latch <- Promise.make[Nothing, Unit]
assignmentStream = ZStream.fromZIO(
// first, get the assignments from the shard manager directly
shardManager.getAssignments.map(_ -> true)
) ++
// then, get assignments changes from Redis
storage.assignmentsStream.map(_ -> false)
_ <- assignmentStream.mapZIO { case (assignmentsOpt, fromShardManager) =>
updateAssignments(assignmentsOpt, fromShardManager) *> latch.succeed(()).when(fromShardManager)
}.runDrain
.retry(Schedule.fixed(config.refreshAssignmentsRetryInterval))
.interruptible
.forkDaemon
.withFinalizer(_.interrupt)
_ <- latch.await
} yield ()

private[shardcake] def isShuttingDown: UIO[Boolean] =
isShuttingDownRef.get
Expand Down

0 comments on commit 426e209

Please sign in to comment.