Skip to content

Commit

Permalink
Expose terminateLocalEntity (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
ghostdogpr authored Nov 29, 2023
1 parent 879d4c6 commit 390605f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
15 changes: 15 additions & 0 deletions entities/src/main/scala/com/devsisters/shardcake/Sharding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,12 @@ class Sharding private (
.catchAllCause(replyChannel.fail)
_ <- entityStates.update(_.updated(recipientType.name, EntityState(entityManager, processBinary)))
} yield ()

def terminateLocalEntity(entityType: EntityType[_], entityId: String): UIO[Unit] =
entityStates.get.flatMap(_.get(entityType.name) match {
case Some(state) => state.entityManager.terminateEntity(entityId)
case None => ZIO.unit
})
}

object Sharding {
Expand Down Expand Up @@ -547,4 +553,13 @@ object Sharding {
*/
def getPods: RIO[Sharding, Set[PodAddress]] =
ZIO.serviceWithZIO[Sharding](_.getPods)

/**
* Terminate a given entity. If a termination message was provided, that message will be sent to the entity and
* no new message will be enqueued after that. If no termination message was provided, the entity will be stopped immediately.
* This method can only be used if the entity is hosted on the current pod (otherwise it will do nothing).
* Typically, you would use this method from inside the entity behavior to stop itself.
*/
def terminateLocalEntity(entityType: EntityType[_], entityId: String): URIO[Sharding, Unit] =
ZIO.serviceWithZIO[Sharding](_.terminateLocalEntity(entityType, entityId))
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ private[shardcake] trait EntityManager[-Req] {
replyId: Option[String],
replyChannel: ReplyChannel[Nothing]
): IO[EntityNotManagedByThisPod, Unit]
def terminateEntity(entityId: String): UIO[Unit]
def terminateEntitiesOnShards(shards: Set[ShardId]): UIO[Unit]
def terminateAllEntities: UIO[Unit]
}
Expand Down Expand Up @@ -46,7 +47,7 @@ private[shardcake] object EntityManager {
private val currentTimeInMilliseconds: UIO[EpochMillis] =
Clock.currentTime(TimeUnit.MILLISECONDS)

class EntityManagerLive[Req](
private class EntityManagerLive[Req](
recipientType: RecipientType[Req],
behavior: (String, Queue[Req]) => Task[Nothing],
terminateMessage: Signal => Option[Req],
Expand Down Expand Up @@ -75,7 +76,7 @@ private[shardcake] object EntityManager {
} yield ()).interruptible.forkDaemon
}

private def terminateEntity(entityId: String): UIO[Unit] =
def terminateEntity(entityId: String): UIO[Unit] =
entities.updateZIO(map =>
map.get(entityId) match {
case Some(Left(queue)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object ShardingSpec extends ZIOSpecDefault {
_ <- Clock.sleep(1 second)
c1 <- counter.send("c1")(GetCounter.apply)
c2 <- counter.send("c2")(GetCounter.apply)
} yield assertTrue(c1 == 2) && assertTrue(c2 == 1)
} yield assertTrue(c1 == 2, c2 == 1)
}
},
test("Streaming") {
Expand All @@ -34,7 +34,7 @@ object ShardingSpec extends ZIOSpecDefault {
_ <- Sharding.registerEntity(Counter, behavior)
_ <- Sharding.registerScoped
counter <- Sharding.messenger(Counter)
stream <- counter.sendStream("c1")(StreamingChanges(_))
stream <- counter.sendStream("c1")(StreamingChanges.apply)
latch <- Promise.make[Nothing, Unit]
fiber <- stream.take(5).tap(_ => latch.succeed(())).runCollect.fork
_ <- latch.await
Expand All @@ -52,7 +52,7 @@ object ShardingSpec extends ZIOSpecDefault {
_ <- Sharding.registerEntity(Counter, behavior)
_ <- Sharding.registerScoped
counter <- Sharding.messenger(Counter)
stream <- counter.sendStream("c1")(StreamingChanges(_))
stream <- counter.sendStream("c1")(StreamingChanges.apply)
latch <- Promise.make[Nothing, Unit]
fiber <- stream.take(2).tap(_ => latch.succeed(())).runCollect.fork
_ <- latch.await
Expand All @@ -75,6 +75,19 @@ object ShardingSpec extends ZIOSpecDefault {
} yield assertTrue(c0 == 1, c1 == 0)
}
},
test("Entity manual termination") {
ZIO.scoped {
for {
_ <- Sharding.registerEntity(Counter, behavior)
_ <- Sharding.registerScoped
counter <- Sharding.messenger(Counter)
_ <- counter.sendDiscard("c3")(IncrementCounter)
c0 <- counter.send("c3")(GetCounter.apply)
_ <- counter.send("c3")(TriggerTerminate.apply)
c1 <- counter.send("c3")(GetCounter.apply) // counter should be restarted
} yield assertTrue(c0 == 1, c1 == 0)
}
},
test("Entity termination with extension") {
ZIO.scoped {
for {
Expand Down Expand Up @@ -119,6 +132,7 @@ object CounterActor {
case object IncrementCounter extends CounterMessage
case object DecrementCounter extends CounterMessage
case class StreamingChanges(replier: StreamReplier[Int]) extends CounterMessage
case class TriggerTerminate(replier: Replier[Unit]) extends CounterMessage
}

object Counter extends EntityType[CounterMessage]("counter")
Expand All @@ -134,6 +148,8 @@ object CounterActor {
case CounterMessage.DecrementCounter => state.update(_ - 1)
case CounterMessage.StreamingChanges(replier) =>
replier.replyStream(state.changes.ensuring(state.set(-1)))
case CounterMessage.TriggerTerminate(replier) =>
Sharding.terminateLocalEntity(Counter, entityId).ensuring(replier.reply(()))
}.forever
)
}

0 comments on commit 390605f

Please sign in to comment.