Skip to content

Commit

Permalink
support scala 2.12 (#129)
Browse files Browse the repository at this point in the history
* support scala 2.12

* make some adjustments to the 2.12 compiler wrangling

* run scalafmt

* also scalafmtSbt because that's not included in scalafmtAll

* add github action step to run tests on 2.12

* sort gha scala versions

* satisfy our desire for order
  • Loading branch information
WtrLmmrs authored May 31, 2024
1 parent 5491e2d commit 96ce5a3
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
fail-fast: false
matrix:
java: ['[email protected]']
scala: ['2.13.13', '3.3.3']
scala: ['2.12.18', '2.13.13', '3.3.3']
steps:
- name: Checkout current branch
uses: actions/[email protected]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.Config
import com.devsisters.shardcake.Server.Message.Ping
import com.devsisters.shardcake.Server.PingPongEntity
import zio._
import zio.{ Config => _, _ }

object Client {
// self host should not be `localhost` to avoid optimization
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.Config
import com.devsisters.shardcake.interfaces.Storage
import zio._
import zio.{ Config => _, _ }
import zio.stream.ZStream

import java.util.concurrent.ForkJoinPool
Expand Down
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
val scala212 = "2.12.18"
val scala213 = "2.13.13"
val scala3 = "3.3.3"
val allScala = Seq(scala213, scala3)
val allScala = Seq(scala212, scala213, scala3)

val zioVersion = "2.1.1"
val zioGrpcVersion = "0.6.2"
Expand All @@ -14,6 +15,7 @@ val redis4catsVersion = "1.5.2"
val redissonVersion = "3.27.1"
val scalaKryoVersion = "1.0.2"
val testContainersVersion = "0.41.3"
val scalaCompatVersion = "2.12.0"

inThisBuild(
List(
Expand Down Expand Up @@ -69,8 +71,9 @@ lazy val core = project
.settings(
libraryDependencies ++=
Seq(
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
"org.scala-lang.modules" %% "scala-collection-compat" % scalaCompatVersion
)
)

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/com/devsisters/shardcake/PodAddress.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.devsisters.shardcake

import scala.collection.compat._

case class PodAddress(host: String, port: Int) {
override def toString: String = s"$host:$port"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ class Sharding private (
(shardManager.notifyUnhealthyPod(pod) *>
// just in case we missed the update from the pubsub, refresh assignments
shardManager.getAssignments
.flatMap(updateAssignments(_, fromShardManager = true))).forkDaemon
.flatMap[Any, Throwable, Unit](updateAssignments(_, fromShardManager = true))).forkDaemon
)
}

Expand Down Expand Up @@ -337,7 +337,8 @@ class Sharding private (
}

def sendStream(entityId: String)(messages: ZStream[Any, Throwable, Msg]): Task[Unit] = {
val send = ReplyChannel.single[Unit].flatMap(sendStreamGeneric(entityId, messages, None, _))
val send =
ReplyChannel.single[Unit].flatMap[Any, Throwable, Unit](sendStreamGeneric(entityId, messages, None, _))
timeout.fold(send)(t => send.timeout(t).unit)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ private[shardcake] object ReplyChannel {
def fail(cause: Cause[Throwable]): UIO[Unit] = promise.failCause(cause).unit
def replySingle(a: A): UIO[Unit] = promise.succeed(Some(a)).unit
def replyStream(stream: ZStream[Any, Throwable, A]): UIO[Unit] =
stream.runHead.flatMap(promise.succeed(_)).catchAllCause(fail).fork.unit
val output: Task[Option[A]] = promise.await.onError(fail)
stream.runHead
.flatMap(promise.succeed(_).unit)
.catchAllCause[Any, Nothing, Unit](fail)
.fork
.unit

val output: Task[Option[A]] = promise.await.onError(fail)
}

def single[A]: UIO[FromPromise[A]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ object ShardingSpec extends ZIOSpecDefault {
_ <- Sharding.registerSingleton("singleton", p.succeed(()) *> ZIO.never)
_ <- Sharding.registerScoped
res <- p.await
} yield assertTrue(res == ())
} yield assertTrue(() == res)
}
},
test("Send stream") {
Expand Down
2 changes: 2 additions & 0 deletions examples/src/main/scala/example/complex/GuildApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import example.complex.GuildBehavior.GuildMessage.{ Join, Terminate }
import example.complex.GuildBehavior._
import zio.{ Config => _, _ }

import scala.collection.compat._

object GuildApp extends ZIOAppDefault {
val config: ZLayer[Any, SecurityException, Config] =
ZLayer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import zio._
import zio.stream.ZStream

import scala.annotation.tailrec
import scala.collection.compat._

/**
* A component in charge of assigning and unassigning shards to/from pods
Expand Down Expand Up @@ -106,7 +107,7 @@ class ShardManager(
.ping(pod)
.timeout(config.pingTimeout)
.someOrFailException
.fold(_ => Set(pod), _ => Set.empty)
.fold(_ => Set(pod), _ => Set.empty[PodAddress])
)
.map(_.flatten)
shardsToRemove =
Expand All @@ -128,7 +129,7 @@ class ShardManager(
)
}
.map(_.unzip)
.map { case (pods, shards) => (pods.flatten.toSet, shards.flatten.toSet) }
.map { case (pods, shards) => (pods.flatten[PodAddress].toSet, shards.flatten[ShardId].toSet) }
(failedUnassignedPods, failedUnassignedShards) = failed
// remove assignments of shards that couldn't be unassigned, as well as faulty pods
filteredAssignments = (readyAssignments -- failedUnassignedPods).map { case (pod, shards) =>
Expand All @@ -147,7 +148,7 @@ class ShardManager(
eventsHub.publish(ShardingEvent.ShardsAssigned(pod, shards)).as(Set.empty)
)
}
.map(_.flatten.toSet)
.map(_.flatten[PodAddress].toSet)
failedPods = failedPingedPods ++ failedUnassignedPods ++ failedAssignedPods
// check if failing pods are still up
_ <- ZIO.foreachDiscard(failedPods)(notifyUnhealthyPod).forkDaemon
Expand Down Expand Up @@ -316,7 +317,9 @@ object ShardManager {
): (Map[PodAddress, Set[ShardId]], Map[PodAddress, Set[ShardId]]) = {
val (_, assignments) = shardsToRebalance.foldLeft((state.shardsPerPod, List.empty[(ShardId, PodAddress)])) {
case ((shardsPerPod, assignments), shard) =>
val unassignedPods = assignments.flatMap { case (shard, _) => state.shards.get(shard).flatten }.toSet
val unassignedPods = assignments.flatMap { case (shard, _) =>
state.shards.get(shard).flatten[PodAddress]
}.toSet
// find pod with least amount of shards
shardsPerPod
// keep only pods with the max version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import zio.stream.ZStream
import zio.stream.interop.fs2z._
import zio.{ Task, ZIO, ZLayer }

import scala.collection.compat._

object StorageRedis {
type fs2Stream[A] = fs2.Stream[Task, A]
type Redis = RedisCommands[Task, String, String] with PubSubCommands[fs2Stream, String, String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import org.redisson.client.codec.StringCodec
import zio.stream.ZStream
import zio.{ Queue, Task, Unsafe, ZIO, ZLayer }

import scala.collection.compat._

object StorageRedis {

/**
Expand Down

0 comments on commit 96ce5a3

Please sign in to comment.