Skip to content

Commit

Permalink
run scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
WtrLmmrs committed May 31, 2024
1 parent 2f9c988 commit 5ce32fe
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.devsisters.shardcake

import com.devsisters.shardcake.Server.Message.Ping
import com.devsisters.shardcake.Server.PingPongEntity
import zio.{Config => _, _}
import zio.{ Config => _, _ }

object Client {
// self host should not be `localhost` to avoid optimization
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.interfaces.Storage
import zio.{Config => _, _}
import zio.{ Config => _, _ }
import zio.stream.ZStream

import java.util.concurrent.ForkJoinPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ class Sharding private (
}

def sendStream(entityId: String)(messages: ZStream[Any, Throwable, Msg]): Task[Unit] = {
val send = ReplyChannel.single[Unit].flatMap[Any, Throwable, Unit](sendStreamGeneric(entityId, messages, None, _))
val send =
ReplyChannel.single[Unit].flatMap[Any, Throwable, Unit](sendStreamGeneric(entityId, messages, None, _))
timeout.fold(send)(t => send.timeout(t).unit)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ private[shardcake] object ReplyChannel {
def fail(cause: Cause[Throwable]): UIO[Unit] = promise.failCause(cause).unit
def replySingle(a: A): UIO[Unit] = promise.succeed(Some(a)).unit
def replyStream(stream: ZStream[Any, Throwable, A]): UIO[Unit] =
stream.runHead.flatMap(promise.succeed(_).unit)
stream.runHead
.flatMap(promise.succeed(_).unit)
.catchAllCause[Any, Nothing, Unit](fail)
.fork.unit
.fork
.unit

val output: Task[Option[A]] = promise.await.onError(fail)
val output: Task[Option[A]] = promise.await.onError(fail)
}

def single[A]: UIO[FromPromise[A]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ object ShardManager {
): (Map[PodAddress, Set[ShardId]], Map[PodAddress, Set[ShardId]]) = {
val (_, assignments) = shardsToRebalance.foldLeft((state.shardsPerPod, List.empty[(ShardId, PodAddress)])) {
case ((shardsPerPod, assignments), shard) =>
val unassignedPods = assignments.flatMap { case (shard, _) => state.shards.get(shard).flatten[PodAddress] }.toSet
val unassignedPods = assignments.flatMap { case (shard, _) =>
state.shards.get(shard).flatten[PodAddress]
}.toSet
// find pod with least amount of shards
shardsPerPod
// keep only pods with the max version
Expand Down

0 comments on commit 5ce32fe

Please sign in to comment.