diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index 629ac0c4..30f35697 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -51,11 +51,16 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { toSequenceNr: Long, batchSize: Int, refreshInterval: Option[(FiniteDuration, Scheduler)]) = { + val firstSequenceNr: Long = Math.max(1, fromSequenceNr) Source - .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((Math.max(1, fromSequenceNr), Continue)) { + .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, Long)]]]((firstSequenceNr, Continue)) { case (from, control) => def limitWindow(from: Long): Long = { - math.min(from + batchSize, toSequenceNr) + if (from == firstSequenceNr || batchSize <= 0 || (Long.MaxValue - batchSize) < from) { + toSequenceNr + } else { + Math.min(from + batchSize, toSequenceNr) + } } def retrieveNextBatch(): Future[Option[((Long, FlowControl), Seq[Try[(PersistentRepr, Long)]])]] = { diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala index e9e8ccdb..be2679fc 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala @@ -50,7 +50,8 @@ abstract class LimitWindowingStreamTest(configFile: String) implicit val mat: Materializer = SystemMaterializer(system).materializer val persistenceId = UUID.randomUUID().toString - val payload = 'a'.toByte + val writerUuid = UUID.randomUUID().toString + val payload = Array.fill(16)('a'.toByte) val eventsPerBatch = 1000 val numberOfInsertBatches = 16 val totalMessages = numberOfInsertBatches * eventsPerBatch @@ -58,14 +59,14 @@ abstract class LimitWindowingStreamTest(configFile: String) withDao { dao => val lastInsert = Source - .fromIterator(() => (1 to numberOfInsertBatches).toIterator) + .fromIterator(() => (1 to numberOfInsertBatches).iterator) .mapAsync(1) { i => val end = i * eventsPerBatch val start = end - (eventsPerBatch - 1) - log.info(s"batch $i (events from $start to $end") + log.info(s"batch $i - events from $start to $end") val atomicWrites = (start to end).map { j => - AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId))) + AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId, writerUuid = writerUuid))) } dao.asyncWriteMessages(atomicWrites).map(_ => i) } diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala index 11242419..efc45ca4 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala @@ -22,6 +22,8 @@ import pekko.persistence.query.Offset import pekko.persistence.query.{ EventEnvelope, Sequence } import pekko.testkit.TestProbe +import scala.concurrent.Future + abstract class CurrentEventsByPersistenceIdTest(config: String) extends QueryTestSpec(config) { import QueryTestSpec.EventEnvelopeProbeOps @@ -219,6 +221,42 @@ abstract class CurrentEventsByPersistenceIdTest(config: String) extends QueryTes } } } + + it should "return event when has been archived more than batch size" in withActorSystem { implicit system => + import pekko.pattern.ask + import system.dispatcher + import scala.concurrent.duration._ + + val journalOps = new JavaDslJdbcReadJournalOperations(system) + val batchSize = readJournalConfig.maxBufferSize + withTestActors(replyToMessages = true) { (actor1, _, _) => + def sendMessages(numberOfMessages: Int): Future[Done] = { + val futures = for (i <- 1 to numberOfMessages) yield { + actor1 ? i + } + Future.sequence(futures).map(_ => Done) + } + + val numberOfEvents = batchSize << 2 + val archiveEventSum = numberOfEvents >> 1 + val batch = sendMessages(numberOfEvents) + + // wait for acknowledgement of the batch + batch.futureValue + + // and then archive some of event(delete it). + val deleteBatch = for (i <- 1 to archiveEventSum) yield { + actor1 ? DeleteCmd(i) + } + // blocking until all delete commands are processed + Future.sequence(deleteBatch).futureValue + + journalOps.withCurrentEventsByPersistenceId()("my-1", 0, Long.MaxValue) { tp => + val allEvents = tp.toStrict(atMost = 10.seconds) + allEvents.size shouldBe (numberOfEvents - archiveEventSum) + } + } + } } // Note: these tests use the shared-db configs, the test for all (so not only current) events use the regular db config diff --git a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala new file mode 100644 index 00000000..6ffcbfbc --- /dev/null +++ b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pekko.persistence.jdbc.integration + +import org.apache.pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest +import org.apache.pekko.persistence.jdbc.query.{ MysqlCleaner, OracleCleaner, PostgresCleaner, SqlServerCleaner } + +class PostgresLimitWindowingStreamTest + extends LimitWindowingStreamTest("postgres-application.conf") + with PostgresCleaner + +class MySQLLimitWindowingStreamTest + extends LimitWindowingStreamTest("mysql-application.conf") + with MysqlCleaner + +class OracleLimitWindowingStreamTest + extends LimitWindowingStreamTest("oracle-application.conf") + with OracleCleaner + +class SqlServerLimitWindowingStreamTest + extends LimitWindowingStreamTest("sqlserver-application.conf") + with SqlServerCleaner