Skip to content

Commit

Permalink
Fix promise leak when using streaming with long-running actors (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
ghostdogpr authored Jul 23, 2024
1 parent 9f1eb4a commit 2a5d6c5
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,12 @@ class Sharding private (
})

private[shardcake] def initReply(id: String, replyChannel: ReplyChannel[Nothing]): UIO[Unit] =
replyChannels.update(_.updated(id, replyChannel)) <*
replyChannel.await.ensuring(replyChannels.update(_ - id)).forkDaemon
replyChannels
.getAndUpdate(_.updated(id, replyChannel))
.flatMap(beforeReplyChannels =>
replyChannel.await.ensuring(replyChannels.update(_ - id)).forkDaemon.unless(beforeReplyChannels.contains(id))
)
.unit

def reply[Reply](reply: Reply, replier: Replier[Reply]): UIO[Unit] =
replyChannels
Expand Down

0 comments on commit 2a5d6c5

Please sign in to comment.