Skip to content

Commit

Permalink
add batchindex to messageid
Browse files Browse the repository at this point in the history
  • Loading branch information
KannarFr committed Aug 27, 2024
1 parent d0bbafc commit 105d2d4
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class CatsAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfter
val r = t.unsafeRunSync()
r.entryId shouldBe value.messageId.entryId
r.partitionIndex shouldBe value.messageId.partitionIndex
r.batchIndex shouldBe value.messageId.batchIndex
consumer.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ sealed trait MessageId {
def ledgerId: Option[Long]
def entryId: Option[Long]
def partitionIndex: Option[Int]
def batchIndex: Option[Int]
}

private case class Pulsar4sMessageIdImpl(underlying: JMessageId) extends MessageId {
Expand Down Expand Up @@ -48,6 +49,11 @@ private case class Pulsar4sMessageIdImpl(underlying: JMessageId) extends Message
case m: TopicMessageIdImpl => Option(m.getPartitionIndex)
case _ => None
}
override def batchIndex: Option[Int] = underlying match {
case m: MessageIdImpl => Some(m.getBatchIndex)
case m: TopicMessageIdImpl => Option(m.getBatchIndex)
case _ => None
}
}

object MessageId {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class MonixAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfte
val r = Await.result(rFuture, Duration.Inf)
r.entryId shouldBe value.messageId.entryId
r.partitionIndex shouldBe value.messageId.partitionIndex
r.batchIndex shouldBe value.messageId.batchIndex
consumer.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ScalazAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAft
val r = t.unsafePerformSync
r.entryId shouldBe value.messageId.entryId
r.partitionIndex shouldBe value.messageId.partitionIndex
r.batchIndex shouldBe value.messageId.batchIndex
consumer.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ZioAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterA
val r = Unsafe.unsafe(implicit unsafe => zio.Runtime.default.unsafe.run(t.either.map(_.toOption.get)).getOrThrowFiberFailure())
r.entryId shouldBe value.messageId.entryId
r.partitionIndex shouldBe value.messageId.partitionIndex
r.batchIndex shouldBe value.messageId.batchIndex
consumer.close()
}

Expand Down

0 comments on commit 105d2d4

Please sign in to comment.