diff --git a/build.sbt b/build.sbt index 16f4703c..5dd15bd0 100644 --- a/build.sbt +++ b/build.sbt @@ -8,34 +8,34 @@ def publishVersion = if (isRelease) releaseVersion else if (isGithubActions) "2. val org = "com.clever-cloud.pulsar4s" val AkkaStreamVersion = "2.6.20" // compatible with Akka 2.5.x and 2.6.x -val CatsEffectVersion = "3.5.3" +val CatsEffectVersion = "3.5.4" val CirceVersion = "0.14.6" val CommonsIoVersion = "2.4" val ExtsVersion = "1.61.1" val JacksonVersion = "2.17.2" -val Log4jVersion = "2.22.1" +val Log4jVersion = "2.23.1" val MonixVersion = "3.4.1" val PekkoStreamVersion = "1.0.2" -val PlayJsonVersion = "2.10.4" -val PulsarVersion = "3.2.0" +val PlayJsonVersion = "2.10.6" +val PulsarVersion = "3.3.1" val ReactiveStreamsVersion = "1.0.2" -val FunctionalStreamsVersion = "3.9.4" +val FunctionalStreamsVersion = "3.10.2" val Json4sVersion = "4.0.7" // Version of Avro4s for Scala 2.X -val Avro4sVersionFor2 = "4.1.1" +val Avro4sVersionFor2 = "4.1.2" // Version of Avro4s for Scala 3.X -val Avro4sVersionFor3 = "5.0.9" -val ScalaVersion = "3.3.1" -val ScalatestVersion = "3.2.17" -val ScalazVersion = "7.2.35" -val Slf4jVersion = "2.0.11" +val Avro4sVersionFor3 = "5.0.13" +val ScalaVersion = "3.3.3" +val ScalatestVersion = "3.2.19" +val ScalazVersion = "7.2.36" +val Slf4jVersion = "2.0.16" val SprayJsonVersion = "1.3.6" -val ZIOVersion = "2.0.21" -val ZIOInteropCatsVersion = "23.0.03" +val ZIOVersion = "2.0.22" +val ZIOInteropCatsVersion = "23.1.0.3" lazy val commonScalaVersionSettings = Seq( scalaVersion := ScalaVersion, - crossScalaVersions := Seq("2.12.18", "2.13.12", ScalaVersion) + crossScalaVersions := Seq("2.12.19", "2.13.14", ScalaVersion) ) lazy val warnUnusedImport = Seq( diff --git a/project/build.properties b/project/build.properties index abbbce5d..04267b14 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.8 +sbt.version=1.9.9 diff --git a/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala b/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala index 29633b3d..5aa36810 100644 --- a/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala +++ b/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala @@ -54,6 +54,7 @@ class CatsAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfter val r = t.unsafeRunSync() r.entryId shouldBe value.messageId.entryId r.partitionIndex shouldBe value.messageId.partitionIndex + r.batchIndex shouldBe value.messageId.batchIndex consumer.close() } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala index eedc26a4..719e20aa 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala @@ -18,6 +18,7 @@ sealed trait MessageId { def ledgerId: Option[Long] def entryId: Option[Long] def partitionIndex: Option[Int] + def batchIndex: Option[Int] } private case class Pulsar4sMessageIdImpl(underlying: JMessageId) extends MessageId { @@ -35,14 +36,22 @@ private case class Pulsar4sMessageIdImpl(underlying: JMessageId) extends Message } override def ledgerId: Option[Long] = underlying match { case m: MessageIdImpl => Option(m.getLedgerId) + case m: TopicMessageIdImpl => Option(m.getLedgerId) case _ => None } override def entryId: Option[Long] = underlying match { case m: MessageIdImpl => Option(m.getEntryId) + case m: TopicMessageIdImpl => Option(m.getEntryId) case _ => None } override def partitionIndex: Option[Int] = underlying match { case m: MessageIdImpl => Some(m.getPartitionIndex) + case m: TopicMessageIdImpl => Option(m.getPartitionIndex) + case _ => None + } + override def batchIndex: Option[Int] = underlying match { + case m: MessageIdImpl => Some(m.getBatchIndex) + case m: TopicMessageIdImpl => Option(m.getBatchIndex) case _ => None } } diff --git a/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala b/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala index 583472e5..54faff18 100644 --- a/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala +++ b/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala @@ -56,6 +56,7 @@ class MonixAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfte val r = Await.result(rFuture, Duration.Inf) r.entryId shouldBe value.messageId.entryId r.partitionIndex shouldBe value.messageId.partitionIndex + r.batchIndex shouldBe value.messageId.batchIndex consumer.close() } diff --git a/pulsar4s-scalaz/src/test/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandlerTest.scala b/pulsar4s-scalaz/src/test/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandlerTest.scala index 6712c1e7..313ae007 100644 --- a/pulsar4s-scalaz/src/test/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandlerTest.scala +++ b/pulsar4s-scalaz/src/test/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandlerTest.scala @@ -47,6 +47,7 @@ class ScalazAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAft val r = t.unsafePerformSync r.entryId shouldBe value.messageId.entryId r.partitionIndex shouldBe value.messageId.partitionIndex + r.batchIndex shouldBe value.messageId.batchIndex consumer.close() } } diff --git a/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala b/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala index e02e370a..43819d19 100644 --- a/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala +++ b/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala @@ -53,6 +53,7 @@ class ZioAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterA val r = Unsafe.unsafe(implicit unsafe => zio.Runtime.default.unsafe.run(t.either.map(_.toOption.get)).getOrThrowFiberFailure()) r.entryId shouldBe value.messageId.entryId r.partitionIndex shouldBe value.messageId.partitionIndex + r.batchIndex shouldBe value.messageId.batchIndex consumer.close() }